From 61c3bd3a2e338582b9ad8f6eaca555dd9241edeb Mon Sep 17 00:00:00 2001 From: Chris Fiege Date: Thu, 20 Apr 2023 15:26:59 +0200 Subject: [PATCH] driver/sigrok: Add sigrok driver for DMMs The SigrokDmmDriver wraps around a single channel DMM controlled by sigrok. It has been tested with Unit-T UT61C and UT61B devices but probably also works with other single chnnel DMMs. This driver binds to a SigrokUsbDevice. Make sure to select the correct driver for your DMM there. Example usage: > resources: > - SigrokUSBDevice: > driver: uni-t-ut61c > match: > 'ID_PATH': pci-0000:07:00.4-usb-0:2:1.0 > drivers: > - SigrokDmmDriver: {} Args: bindings (dict): driver to use with sigrok Signed-off-by: Chris Fiege --- doc/configuration.rst | 33 ++++++++++++ labgrid/driver/__init__.py | 2 +- labgrid/driver/sigrokdriver.py | 92 ++++++++++++++++++++++++++++++++++ 3 files changed, 126 insertions(+), 1 deletion(-) diff --git a/doc/configuration.rst b/doc/configuration.rst index f859dbec8..4d226732c 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -2444,6 +2444,39 @@ Arguments: - max_current (float): optional, maximum allowed current for protection against accidental damage (in ampere) +SigrokDmmDriver +~~~~~~~~~~~~~~~ +The `SigrokDmmDriver` uses a `SigrokDevice` resource to record samples from a digital multimeter (DMM) and provides +them during test runs. + +It is known to work with Unit-T `UT61B` and `UT61C` devices but should also work with other DMMs supported by *sigrok*. + +Binds to: + sigrok: + - `SigrokUSBDevice`_ + - `SigrokUSBSerialDevice`_ + - `NetworkSigrokUSBDevice`_ + - NetworkSigrokUSBSerialDevice + +Implements: + - None yet + +Arguments: + - None + +Sampling can be started calling `start(samples, timeout=None)`. +It sets up sampling and returns immediately. +If no `timeout` is provided the timeout will be set to something reasonable. + +Samples can be obtained using `get_samples()`. `get_samples()` will block until either *sigrok* terminates or `timeout` +is reached. +This method returns a `(unit, samples)` tuple: +`unit` is the physical unit reported by the DMM; +samples is an iterable of samples. + +This driver relies on buffering of the subprocess call. +Reading a few samples will very likely work - but obtaining a lot of samples may stall. + USBSDMuxDriver ~~~~~~~~~~~~~~ The :any:`USBSDMuxDriver` uses a USBSDMuxDevice resource to control a diff --git a/labgrid/driver/__init__.py b/labgrid/driver/__init__.py index 471eb0078..d26ad68f4 100644 --- a/labgrid/driver/__init__.py +++ b/labgrid/driver/__init__.py @@ -23,7 +23,7 @@ from .qemudriver import QEMUDriver from .modbusdriver import ModbusCoilDriver from .modbusrtudriver import ModbusRTUDriver -from .sigrokdriver import SigrokDriver, SigrokPowerDriver +from .sigrokdriver import SigrokDriver, SigrokPowerDriver, SigrokDmmDriver from .usbstoragedriver import USBStorageDriver, NetworkUSBStorageDriver, Mode from .resetdriver import DigitalOutputResetDriver from .gpiodriver import GpioDigitalOutputDriver diff --git a/labgrid/driver/sigrokdriver.py b/labgrid/driver/sigrokdriver.py index ded2edc82..a8cd5d0ad 100644 --- a/labgrid/driver/sigrokdriver.py +++ b/labgrid/driver/sigrokdriver.py @@ -23,6 +23,7 @@ from .common import Driver, check_file from .exception import ExecutionError from .powerdriver import PowerResetMixin +from ..util import Timeout @attr.s(eq=False) @@ -329,3 +330,94 @@ def measure(self): if len(res) != 2: raise ExecutionError(f"Cannot parse --show output {out}") return res + +@target_factory.reg_driver +@attr.s(eq=False) +class SigrokDmmDriver(SigrokCommon): + """ + This driver wraps around a single channel DMM controlled by sigrok. + It has been tested with Unit-T UT61C and UT61B devices but probably also + works with other single chnnel DMMs. + + This driver binds to a SigrokUsbDevice. + Make sure to select the correct driver for your DMM there. + + Example usage: + > resources: + > - SigrokUSBDevice: + > driver: uni-t-ut61c + > match: + > 'ID_PATH': pci-0000:07:00.4-usb-0:2:1.0 + > drivers: + > - SigrokDmmDriver: {} + + Args: + bindings (dict): driver to use with sigrok + """ + bindings = { + "sigrok": {SigrokUSBSerialDevice, NetworkSigrokUSBSerialDevice, SigrokUSBDevice, NetworkSigrokUSBDevice}, + } + + @Driver.check_active + @step(result=True) + def start(self, samples, timeout=None): + """ + Starts to read samples from the DMM. + This function returns once sampling has been started. Sampling continues in the background. + + Note: We use subprocess.PIPE to buffer the samples. When this buffer is too small for the number of + sampled requested sampling may stall. + + Args: + samples: Number of samples to obtain + timeout: Timeout after which sampling should be stopped. + If None: timemout[s] = samples * 1s + 5s + If int: Timeout in [s] + + Returns: None + """ + if not timeout: + timeout = samples + 5.0 + + args = f"-O csv --samples {samples}".split(" ") + self._call_with_driver(*args) + self._timeout = Timeout(timeout) + self._running = True + + @Driver.check_active + @step(result=True) + def get_samples(self): + """ + Waits for sigrok to complete and returns all samples obtained afterwards. + This function blocks until either sigrok has terminated or the timeout has been reached. + + Returns: + (unit_spec, [sample, ...]) + """ + if not self._running: + raise RuntimeError("Capture was not started before get_samples was called.") + while not self._timeout.expired: + if self._process.poll() is not None: + # process has finished. no need to wait for the timeout + break + time.sleep(0.1) + else: + # process did not finish in time + self.log.info("sigrok-cli did not finish in time, increase timeout?") + self._process.kill() + + res = [] + unit = "" + for line in self._process.stdout.readlines(): + line = line.strip() + if b';' in line: + # discard header information + continue + if not unit: + # the first line after the header contains the unit information + unit = line.decode() + else: + # all other lines are actual values + res.append(float(line)) + self._process.communicate() + return unit, res