From 634d3e62079938b4e4f5b6d71fcdfce1b44410e6 Mon Sep 17 00:00:00 2001 From: Jesse London Date: Thu, 8 Jun 2023 15:17:43 -0500 Subject: [PATCH] speed-ndt7: completed & polished measurement using NDT7 client * implemented timeout * fixed output parsing * etc. see #23 --- src/netrics/measurement/ndt7.py | 153 +++++++++++++++++++++----------- 1 file changed, 101 insertions(+), 52 deletions(-) diff --git a/src/netrics/measurement/ndt7.py b/src/netrics/measurement/ndt7.py index 6940ab7..ddd3303 100644 --- a/src/netrics/measurement/ndt7.py +++ b/src/netrics/measurement/ndt7.py @@ -1,41 +1,66 @@ -#!/usr/bin/env python3 -"""Measure Internet bandwidth, *etc*., via Measurement Lab's NDT7 Client CLI.""" -import subprocess as sp +"""Measure Internet bandwidth, *etc*., via Measurement Lab's NDT7 client.""" import json +import subprocess from schema import Optional, Or + from netrics import task + from .common import require_net + PARAMS = task.schema.extend('ndt7', { - # exec: speedtest executable name or path + # exec: ndt7-client executable name or path Optional('exec', default='ndt7-client'): task.schema.Command( - error='exec: must be an executable on PATH or file system absolute path to be executable' + error='exec: must be an executable on PATH or file system absolute path to executable' ), # timeout: seconds after which test is canceled # (0, None, False, etc. to disable timeout) - Optional('timeout', default=60): Or(task.schema.GTZero(), + Optional('timeout', default=45): Or(task.schema.GTZero(), task.schema.falsey, error='timeout: seconds greater than zero or ' 'falsey to disable'), }) + @task.param.require(PARAMS) @require_net def main(params): + """Measure Internet bandwidth, *etc*., via M-Lab's NDT7 client. + + The local network, and then Internet hosts (as configured in global + defaults), are queried first, to ensure network operation and + internet accessibility. (See: `require_net`.) + The ndt7-client binary is then executed. + + This binary is presumed to be accessible via PATH at `ndt7-client`. + This PATH look-up name is configurable, and may be replaced with the + absolute file system path, instead (`exec`). + + Should the speedtest not return within `timeout` seconds, an error + is returned. (This may be disabled by setting a "falsey" timeout + value.) + + In addition to NDT metrics such as download bandwidth + (`download`) and upload bandwidth (`upload`), measurement results + are written to include the key `test_bytes_consumed`. This item is + *not* included under the test's `label`, (regardless of `result` + configuration). + + """ try: - proc = sp.run( + proc = subprocess.run( ( params.exec, '-format', 'json', ), + timeout=(params.timeout or None), capture_output=True, text=True ) - - except sp.TimeoutExpired as exc: + except subprocess.TimeoutExpired as exc: task.log.critical( cmd=exc.cmd, elapsed=exc.timeout, @@ -67,8 +92,8 @@ def main(params): task.log.info( download=parsed['download'], upload=parsed['upload'], - bytes_consumed=parsed['meta'].get('total_bytes_consumed'), - downloaduuid=parsed['downloaduuid'], + bytes_consumed=parsed['meta']['total_bytes_consumed'], + uuid_download=parsed['meta']['downloaduuid'], ) # flatten results @@ -82,7 +107,7 @@ def main(params): results = {'speedtest_ndt7': data} # extend results with non-measurement data - if 'total_bytes_consumed' in parsed['meta']: + if parsed['meta']['total_bytes_consumed']: extended = {'test_bytes_consumed': parsed['meta']['total_bytes_consumed']} else: extended = None @@ -97,46 +122,70 @@ def main(params): def parse_output(output): + """Parse output from M-Lab NDT7 client. - if not output: - return None - - try: - for obj in output.split("\n")[:-1]: - - response = json.loads(obj) - key = response.get("Key", None) - value = response.get("Value", None) - - if key == "measurement": - origin = value['Origin'] - test = value['Test'] - if origin == 'client': - num_bytes = value["AppInfo"]["NumBytes"] - if test == "download": - dl_bytes = num_bytes - else: - ul_bytes = num_bytes - - if (not key) & (not value): - result = { - 'download': response["Download"]["Value"], - 'upload': response["Upload"]["Value"], - 'downloadretrans': response["DownloadRetrans"]["Value"], - 'minrtt': response["MinRTT"]["Value"], - 'server': response["ServerFQDN"], - 'server_ip': response["ServerIP"], - 'downloaduuid': response["DownloadUUID"], - 'meta': {} - } - - except (KeyError, ValueError, TypeError) as exc: - print("output parsing error") - return None - else: - result["meta"]["total_bytes_consumed"] = dl_bytes + ul_bytes - return result + Note: Should output not conform to expectations, `None` may be + returned. + """ + try: + # + # output consists of one or more lines of JSON objects + # + # this should entail arbitrary status lines (without -quiet flag) + # followed by a single summary line + # + (*statuses, summary) = (json.loads(line) for line in output.splitlines()) + + # + # bytes consumed by the tests may only be retrieved from the status + # lines under the measurement key + # + # we may retrieve the total bytes consumed by each test via the + # *last* status line + # + (bytes_dl, bytes_ul) = ( + # + # retrieve the *last* matching element by iterating statuses in *reverse* + # + # (if no matching element evaluate 0) + # + next( + ( + status['Value']['AppInfo']['NumBytes'] + for status in reversed(statuses) + if ( + status['Key'] == 'measurement' and + status['Value']['Origin'] == 'client' and + status['Value']['Test'] == test_name + ) + ), + 0, + ) + for test_name in ('download', 'upload') + ) -if __name__ == '__main__': - main() + # + # all other results are presented by the summary + # + return { + 'download': summary['Download']['Throughput']['Value'], + 'upload': summary['Upload']['Throughput']['Value'], + + 'downloadretrans': summary['Download']['Retransmission']['Value'], + 'downloadlatency': summary['Download']['Latency']['Value'], + + 'server': summary['ServerFQDN'], + 'server_ip': summary['ServerIP'], + + 'meta': { + 'downloaduuid': summary['Download']['UUID'], + 'total_bytes_consumed': bytes_dl + bytes_ul, + } + } + except (KeyError, TypeError, ValueError) as exc: + task.log.error( + error=str(exc), + msg="output parsing error", + ) + return None