Skip to content

Commit

Permalink
driver/sigrok: Add sigrok driver for DMMs
Browse files Browse the repository at this point in the history
The SigrokDmmDriver wraps around a single channel DMM controlled by sigrok.
It has been tested with Unit-T UT61C and UT61B devices but probably also
works with other single chnnel DMMs.

This driver binds to a SigrokUsbDevice.
Make sure to select the correct driver for your DMM there.

Example usage:
> resources:
>   - SigrokUSBDevice:
>       driver: uni-t-ut61c
>       match:
>         'ID_PATH': pci-0000:07:00.4-usb-0:2:1.0
> drivers:
>   - SigrokDmmDriver: {}

Args:
    bindings (dict): driver to use with sigrok

Signed-off-by: Chris Fiege <[email protected]>
  • Loading branch information
SmithChart committed Jun 12, 2023
1 parent 3dc17a9 commit 61c3bd3
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 1 deletion.
33 changes: 33 additions & 0 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2444,6 +2444,39 @@ Arguments:
- max_current (float): optional, maximum allowed current for protection against
accidental damage (in ampere)

SigrokDmmDriver
~~~~~~~~~~~~~~~
The `SigrokDmmDriver` uses a `SigrokDevice` resource to record samples from a digital multimeter (DMM) and provides
them during test runs.

It is known to work with Unit-T `UT61B` and `UT61C` devices but should also work with other DMMs supported by *sigrok*.

Binds to:
sigrok:
- `SigrokUSBDevice`_
- `SigrokUSBSerialDevice`_
- `NetworkSigrokUSBDevice`_
- NetworkSigrokUSBSerialDevice

Implements:
- None yet

Arguments:
- None

Sampling can be started calling `start(samples, timeout=None)`.
It sets up sampling and returns immediately.
If no `timeout` is provided the timeout will be set to something reasonable.

Samples can be obtained using `get_samples()`. `get_samples()` will block until either *sigrok* terminates or `timeout`
is reached.
This method returns a `(unit, samples)` tuple:
`unit` is the physical unit reported by the DMM;
samples is an iterable of samples.

This driver relies on buffering of the subprocess call.
Reading a few samples will very likely work - but obtaining a lot of samples may stall.

USBSDMuxDriver
~~~~~~~~~~~~~~
The :any:`USBSDMuxDriver` uses a USBSDMuxDevice resource to control a
Expand Down
2 changes: 1 addition & 1 deletion labgrid/driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from .qemudriver import QEMUDriver
from .modbusdriver import ModbusCoilDriver
from .modbusrtudriver import ModbusRTUDriver
from .sigrokdriver import SigrokDriver, SigrokPowerDriver
from .sigrokdriver import SigrokDriver, SigrokPowerDriver, SigrokDmmDriver
from .usbstoragedriver import USBStorageDriver, NetworkUSBStorageDriver, Mode
from .resetdriver import DigitalOutputResetDriver
from .gpiodriver import GpioDigitalOutputDriver
Expand Down
92 changes: 92 additions & 0 deletions labgrid/driver/sigrokdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from .common import Driver, check_file
from .exception import ExecutionError
from .powerdriver import PowerResetMixin
from ..util import Timeout


@attr.s(eq=False)
Expand Down Expand Up @@ -329,3 +330,94 @@ def measure(self):
if len(res) != 2:
raise ExecutionError(f"Cannot parse --show output {out}")
return res

@target_factory.reg_driver
@attr.s(eq=False)
class SigrokDmmDriver(SigrokCommon):
"""
This driver wraps around a single channel DMM controlled by sigrok.
It has been tested with Unit-T UT61C and UT61B devices but probably also
works with other single chnnel DMMs.
This driver binds to a SigrokUsbDevice.
Make sure to select the correct driver for your DMM there.
Example usage:
> resources:
> - SigrokUSBDevice:
> driver: uni-t-ut61c
> match:
> 'ID_PATH': pci-0000:07:00.4-usb-0:2:1.0
> drivers:
> - SigrokDmmDriver: {}
Args:
bindings (dict): driver to use with sigrok
"""
bindings = {
"sigrok": {SigrokUSBSerialDevice, NetworkSigrokUSBSerialDevice, SigrokUSBDevice, NetworkSigrokUSBDevice},
}

@Driver.check_active
@step(result=True)
def start(self, samples, timeout=None):
"""
Starts to read samples from the DMM.
This function returns once sampling has been started. Sampling continues in the background.
Note: We use subprocess.PIPE to buffer the samples. When this buffer is too small for the number of
sampled requested sampling may stall.
Args:
samples: Number of samples to obtain
timeout: Timeout after which sampling should be stopped.
If None: timemout[s] = samples * 1s + 5s
If int: Timeout in [s]
Returns: None
"""
if not timeout:
timeout = samples + 5.0

args = f"-O csv --samples {samples}".split(" ")
self._call_with_driver(*args)
self._timeout = Timeout(timeout)
self._running = True

@Driver.check_active
@step(result=True)
def get_samples(self):
"""
Waits for sigrok to complete and returns all samples obtained afterwards.
This function blocks until either sigrok has terminated or the timeout has been reached.
Returns:
(unit_spec, [sample, ...])
"""
if not self._running:
raise RuntimeError("Capture was not started before get_samples was called.")
while not self._timeout.expired:
if self._process.poll() is not None:
# process has finished. no need to wait for the timeout
break
time.sleep(0.1)
else:
# process did not finish in time
self.log.info("sigrok-cli did not finish in time, increase timeout?")
self._process.kill()

res = []
unit = ""
for line in self._process.stdout.readlines():
line = line.strip()
if b';' in line:
# discard header information
continue
if not unit:
# the first line after the header contains the unit information
unit = line.decode()
else:
# all other lines are actual values
res.append(float(line))
self._process.communicate()
return unit, res

0 comments on commit 61c3bd3

Please sign in to comment.