diff --git a/doc/configuration.rst b/doc/configuration.rst index a4c83b040..af7cc8aee 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -2710,6 +2710,9 @@ Arguments: The driver can be used in test cases by calling its ``capture()``, ``stop()`` and ``analyze()`` methods. +To capture for a certain predetermined amount of time or number of samples, +methods ``capture_for_time()`` and ``capture_samples()`` can be used. + SigrokPowerDriver ~~~~~~~~~~~~~~~~~ The :any:`SigrokPowerDriver` uses a `SigrokUSBSerialDevice`_ resource to diff --git a/labgrid/driver/sigrokdriver.py b/labgrid/driver/sigrokdriver.py index 747f3ac66..7c29b52bb 100644 --- a/labgrid/driver/sigrokdriver.py +++ b/labgrid/driver/sigrokdriver.py @@ -92,7 +92,7 @@ def _get_sigrok_prefix(self): return self.sigrok.command_prefix + prefix @Driver.check_active - @step(title='call', args=['args']) + @step(title='call_with_driver', args=['args']) def _call_with_driver(self, *args): combined = self._get_sigrok_prefix() + list(args) self.logger.debug("Combined command: %s", " ".join(combined)) @@ -100,8 +100,30 @@ def _call_with_driver(self, *args): combined, stdout=subprocess.PIPE, stdin=subprocess.PIPE, - stderr=subprocess.PIPE + stderr=subprocess.PIPE, + text=True + ) + + @Driver.check_active + @step(title='call_with_driver_blocking', args=['args']) + def _call_with_driver_blocking(self, *args, log_output=False): + combined = self._get_sigrok_prefix() + list(args) + self.logger.debug("Combined command: %s", " ".join(combined)) + process = subprocess.Popen( + combined, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True ) + stdout, stderr = process.communicate() + if log_output: + self.logger.debug("stdout: %s", stdout) + self.logger.debug("stderr: %s", stderr) + if process.returncode != 0: + raise OSError + return stdout, stderr + @Driver.check_active @step(title='call', args=['args']) @@ -133,6 +155,19 @@ class SigrokDriver(SigrokCommon): @Driver.check_active def capture(self, filename, samplerate="200k"): + """ + Starts to capture samples, in the background. + + Args: + filename: the path to the file where the capture is saved. + samplerate: the sample-rate of the capture + + Raises: + RuntimeError() if a capture is already running. + """ + if self._running: + raise RuntimeError("capture is already running") + self._filename = filename self._basename = os.path.basename(self._filename) self.logger.debug( @@ -145,6 +180,7 @@ def capture(self, filename, samplerate="200k"): filename = os.path.join(self._tmpdir, self._basename) cmd.append(filename) self._call_with_driver(*cmd) + args = self.sigrok.command_prefix + ['test', '-e', filename] while subprocess.call(args): @@ -162,12 +198,65 @@ def capture(self, filename, samplerate="200k"): self._running = True + @Driver.check_active + def capture_for_time(self, filename, time_ms, samplerate="200k"): + """ + Captures samples for a specified time (ms). + + Blocks while capturing. + + Args: + filename: the path to the file where the capture is saved. + time: time (in ms) for capture duration + samplerate: the sample-rate of the capture + + Returns: + The capture as a list containing dict's with fields "Time" + and the channel names + + Raises: + OSError() if the subprocess returned with non-zero return-code + """ + return self._capture_blocking(filename, ["--time", str(time_ms)], samplerate) + + @Driver.check_active + def capture_samples(self, filename, samples, samplerate="200k"): + """ + Captures a specified number of samples. + + Blocks while capturing. + + Args: + filename: the path to the file where the capture is saved. + samples: number of samples to capture + samplerate: the sample-rate of the capture + + Returns: + The capture as a list containing dict's with fields "Time" + and the channel names + """ + return self._capture_blocking(filename, ["--samples", str(samples)], samplerate) + @Driver.check_active def stop(self): - assert self._running + """ + Stops the capture and returns recorded samples. + + Note that this method might block for several seconds because it needs + to wait for output, parse the capture file and prepare the list + containing the samples. + + Returns: + The capture as a list containing dict's with fields "Time" + and the channel names + + Raises: + RuntimeError() if capture has not been started + """ + if not self._running: + raise RuntimeError("no capture started yet") self._running = False - fnames = ['time'] - fnames.extend(self.sigrok.channels.split(',')) + csv_filename = f'{os.path.splitext(self._basename)[0]}.csv' # sigrok-cli can be quit through any keypress @@ -175,16 +264,49 @@ def stop(self): self.logger.debug("stdout: %s", stdout) self.logger.debug("stderr: %s", stderr) - # Convert from .sr to .csv + self._convert_sr_csv(os.path.join(self._tmpdir, self._basename), + os.path.join(self._tmpdir, csv_filename)) + self._transfer_tmp_file(csv_filename) + return self._process_csv(csv_filename) + + def _capture_blocking(self, filename, capture_args, samplerate): + self._filename = filename + self._basename = os.path.basename(self._filename) + csv_filename = f'{os.path.splitext(self._basename)[0]}.csv' + self.logger.debug( + "Saving to: %s with basename: %s", self._filename, self._basename + ) + cmd = [ + "-l", "4", "--config", f"samplerate={samplerate}", + *capture_args, "-o" + ] + filename = os.path.join(self._tmpdir, self._basename) + cmd.append(filename) + self._call_with_driver_blocking(*cmd, log_output=True) + + args = self.sigrok.command_prefix + ['test', '-e', filename] + + while subprocess.call(args): + sleep(0.1) + + self._convert_sr_csv(os.path.join(self._tmpdir, self._basename), + os.path.join(self._tmpdir, csv_filename)) + self._transfer_tmp_file(csv_filename) + return self._process_csv(csv_filename) + + def _convert_sr_csv(self, file_path_sr, file_path_scv): cmd = [ '-i', - os.path.join(self._tmpdir, self._basename), '-O', 'csv:time=true', '-o', - os.path.join(self._tmpdir, csv_filename) + file_path_sr, '-O', 'csv:time=true', '-o', + file_path_scv ] self._call(*cmd) stdout, stderr = self._process.communicate() self.logger.debug("stdout: %s", stdout) self.logger.debug("stderr: %s", stderr) + + + def _transfer_tmp_file(self, csv_filename): if isinstance(self.sigrok, NetworkSigrokUSBDevice): subprocess.call([ 'scp', f'{self.sigrok.host}:{os.path.join(self._tmpdir, self._basename)}', @@ -201,6 +323,16 @@ def stop(self): stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + else: + shutil.copyfile( + os.path.join(self._tmpdir, self._basename), self._filename + ) + + def _process_csv(self, csv_filename): + fnames = ['time'] + fnames.extend(self.sigrok.channels.split(',')) + + if isinstance(self.sigrok, NetworkSigrokUSBDevice): with open(os.path.join(self._local_tmpdir, csv_filename)) as csv_file: # skip first 5 lines of the csv output, contains metadata and fieldnames @@ -208,9 +340,6 @@ def stop(self): next(csv_file) return list(csv.DictReader(csv_file, fieldnames=fnames)) else: - shutil.copyfile( - os.path.join(self._tmpdir, self._basename), self._filename - ) with open(os.path.join(self._tmpdir, csv_filename)) as csv_file: # skip first 5 lines of the csv output, contains metadata and fieldnames for _ in range(0, 5): @@ -219,6 +348,15 @@ def stop(self): @Driver.check_active def analyze(self, args, filename=None): + """ + Analyzes captured data through `sigrok-cli`'s command line interface + + Args: + args: args to `sigrok-cli` + + Returns: + A dictionary containing the matched groups. + """ annotation_regex = re.compile(r'(?P\d+)-(?P\d+) (?P[\w\-]+): (?P[\w\-]+): (?P".*)') # pylint: disable=line-too-long if not filename and self._filename: filename = os.path.join(self._tmpdir, self._basename) @@ -408,12 +546,15 @@ def stop(self): Raises: RuntimeError() if capture has not been started + OSError() if the subprocess returned with non-zero return-code """ if not self._running: raise RuntimeError("no capture started yet") while not self._timeout.expired: if self._process.poll() is not None: # process has finished. no need to wait for the timeout + if self._process.returncode != 0: + raise OSError break time.sleep(0.1) else: diff --git a/tests/test_sigrok.py b/tests/test_sigrok.py index a6ddcf315..f81f90c9a 100644 --- a/tests/test_sigrok.py +++ b/tests/test_sigrok.py @@ -11,6 +11,8 @@ pytestmark = pytest.mark.skipif(not which("sigrok-cli"), reason="sigrok not available") +VENDOR_ID = "0925" +PRODUCT_ID = "3881" def test_sigrok_resource(target): r = SigrokUSBDevice(target, name=None, match={"sys_name": "1-12"}, driver='fx2lafw', channels="D0,D1") @@ -23,8 +25,8 @@ def test_sigrok_driver(target): @pytest.mark.sigrokusb -def test_sigrok_usb_driver(target, tmpdir): - r = SigrokUSBDevice(target, name=None, match={"ID_MODEL_ID": "3881", "ID_VENDOR_ID": "0925"}, driver='fx2lafw', channels="D0,D1") +def test_sigrok_usb_driver_capture(target, tmpdir): + r = SigrokUSBDevice(target, name=None, match={"ID_MODEL_ID": PRODUCT_ID, "ID_VENDOR_ID": VENDOR_ID}, driver='fx2lafw', channels="D0,D1") d = SigrokDriver(target, name=None) target.activate(d) record = tmpdir.join("output.sr") @@ -32,10 +34,36 @@ def test_sigrok_usb_driver(target, tmpdir): sleep(5) samples = d.stop() assert os.path.getsize(record) > 0 - assert samples != None + assert samples is not None assert list(samples[0].keys()) == ['time', 'D0', 'D1'] assert list(samples[-1].keys()) == ['time', 'D0', 'D1'] +@pytest.mark.sigrokusb +def test_sigrok_usb_driver_blocking_samples(target, tmpdir): + r = SigrokUSBDevice(target, name=None, match={"ID_MODEL_ID": PRODUCT_ID, "ID_VENDOR_ID": VENDOR_ID}, driver='fx2lafw', channels="D0,D1") + d = SigrokDriver(target, name=None) + target.activate(d) + record = tmpdir.join("output.sr") + samples = d.capture_samples(record, 100) + assert os.path.getsize(record) > 0 + assert samples is not None + assert len(samples) == 100 + + +@pytest.mark.sigrokusb +def test_sigrok_usb_driver_blocking_time(target, tmpdir): + r = SigrokUSBDevice(target, name=None, match={"ID_MODEL_ID": PRODUCT_ID, "ID_VENDOR_ID": VENDOR_ID}, driver='fx2lafw', channels="D0,D1") + d = SigrokDriver(target, name=None) + target.activate(d) + record = tmpdir.join("output.sr") + samples = d.capture_for_time(record, 101) # sigrok-cli captures 5ms less than specified. + assert os.path.getsize(record) > 0 + assert samples is not None + assert list(samples[0].keys()) == ['time', 'D0', 'D1'] + assert list(samples[-1].keys()) == ['time', 'D0', 'D1'] + time = float(samples[-1]['time']) - float(samples[0]['time']) + assert time >= 100_000 + def test_sigrok_power_driver(target): r = SigrokUSBSerialDevice(target, name=None, driver='manson-hcs-3xxx') r.avail = True