Skip to content

Commit

Permalink
speed-ndt7: completed & polished measurement using NDT7 client
Browse files Browse the repository at this point in the history
* implemented timeout
* fixed output parsing
* etc.

see #23
  • Loading branch information
jesteria committed Jun 12, 2023
1 parent ffc772d commit b1bee2f
Showing 1 changed file with 101 additions and 52 deletions.
153 changes: 101 additions & 52 deletions src/netrics/measurement/ndt7.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,66 @@
#!/usr/bin/env python3
"""Measure Internet bandwidth, *etc*., via Measurement Lab's NDT7 Client CLI."""
import subprocess as sp
"""Measure Internet bandwidth, *etc*., via Measurement Lab's NDT7 client."""
import json
import subprocess

from schema import Optional, Or

from netrics import task

from .common import require_net


PARAMS = task.schema.extend('ndt7', {
# exec: speedtest executable name or path
# exec: ndt7-client executable name or path
Optional('exec', default='ndt7-client'): task.schema.Command(
error='exec: must be an executable on PATH or file system absolute path to be executable'
error='exec: must be an executable on PATH or file system absolute path to executable'
),

# timeout: seconds after which test is canceled
# (0, None, False, etc. to disable timeout)
Optional('timeout', default=60): Or(task.schema.GTZero(),
Optional('timeout', default=45): Or(task.schema.GTZero(),
task.schema.falsey,
error='timeout: seconds greater than zero or '
'falsey to disable'),
})


@task.param.require(PARAMS)
@require_net
def main(params):
"""Measure Internet bandwidth, *etc*., via M-Lab's NDT7 client.
The local network, and then Internet hosts (as configured in global
defaults), are queried first, to ensure network operation and
internet accessibility. (See: `require_net`.)
The ndt7-client binary is then executed.
This binary is presumed to be accessible via PATH at `ndt7-client`.
This PATH look-up name is configurable, and may be replaced with the
absolute file system path, instead (`exec`).
Should the speedtest not return within `timeout` seconds, an error
is returned. (This may be disabled by setting a "falsey" timeout
value.)
In addition to NDT metrics such as download bandwidth
(`download`) and upload bandwidth (`upload`), measurement results
are written to include the key `test_bytes_consumed`. This item is
*not* included under the test's `label`, (regardless of `result`
configuration).
"""
try:
proc = sp.run(
proc = subprocess.run(
(
params.exec,
'-format', 'json',
),
timeout=(params.timeout or None),
capture_output=True,
text=True
)

except sp.TimeoutExpired as exc:
except subprocess.TimeoutExpired as exc:
task.log.critical(
cmd=exc.cmd,
elapsed=exc.timeout,
Expand Down Expand Up @@ -67,8 +92,8 @@ def main(params):
task.log.info(
download=parsed['download'],
upload=parsed['upload'],
bytes_consumed=parsed['meta'].get('total_bytes_consumed'),
downloaduuid=parsed['downloaduuid'],
bytes_consumed=parsed['meta']['total_bytes_consumed'],
uuid_download=parsed['meta']['downloaduuid'],
)

# flatten results
Expand All @@ -82,7 +107,7 @@ def main(params):
results = {'speedtest_ndt7': data}

# extend results with non-measurement data
if 'total_bytes_consumed' in parsed['meta']:
if parsed['meta']['total_bytes_consumed']:
extended = {'test_bytes_consumed': parsed['meta']['total_bytes_consumed']}
else:
extended = None
Expand All @@ -97,46 +122,70 @@ def main(params):


def parse_output(output):
"""Parse output from M-Lab NDT7 client.
if not output:
return None

try:
for obj in output.split("\n")[:-1]:

response = json.loads(obj)
key = response.get("Key", None)
value = response.get("Value", None)

if key == "measurement":
origin = value['Origin']
test = value['Test']
if origin == 'client':
num_bytes = value["AppInfo"]["NumBytes"]
if test == "download":
dl_bytes = num_bytes
else:
ul_bytes = num_bytes

if (not key) & (not value):
result = {
'download': response["Download"]["Value"],
'upload': response["Upload"]["Value"],
'downloadretrans': response["DownloadRetrans"]["Value"],
'minrtt': response["MinRTT"]["Value"],
'server': response["ServerFQDN"],
'server_ip': response["ServerIP"],
'downloaduuid': response["DownloadUUID"],
'meta': {}
}

except (KeyError, ValueError, TypeError) as exc:
print("output parsing error")
return None
else:
result["meta"]["total_bytes_consumed"] = dl_bytes + ul_bytes
return result
Note: Should output not conform to expectations, `None` may be
returned.
"""
try:
#
# output consists of one or more lines of JSON objects
#
# this should entail arbitrary status lines (without -quiet flag)
# followed by a single summary line
#
(*statuses, summary) = (json.loads(line) for line in output.splitlines())

#
# bytes consumed by the tests may only be retrieved from the status
# lines under the measurement key
#
# we may retrieve the total bytes consumed by each test via the
# *last* status line
#
(bytes_dl, bytes_ul) = (
#
# retrieve the *last* matching element by iterating statuses in *reverse*
#
# (if no matching element evaluate 0)
#
next(
(
status['Value']['AppInfo']['NumBytes']
for status in reversed(statuses)
if (
status['Key'] == 'measurement' and
status['Value']['Origin'] == 'client' and
status['Value']['Test'] == test_name
)
),
0,
)
for test_name in ('download', 'upload')
)

if __name__ == '__main__':
main()
#
# all other results are presented by the summary
#
return {
'download': summary['Download']['Throughput']['Value'],
'upload': summary['Upload']['Throughput']['Value'],

'downloadretrans': summary['Download']['Retransmission']['Value'],
'downloadlatency': summary['Download']['Latency']['Value'],

'server': summary['ServerFQDN'],
'server_ip': summary['ServerIP'],

'meta': {
'downloaduuid': summary['Download']['UUID'],
'total_bytes_consumed': bytes_dl + bytes_ul,
}
}
except (KeyError, TypeError, ValueError) as exc:
task.log.error(
error=str(exc),
msg="output parsing error",
)
return None

0 comments on commit b1bee2f

Please sign in to comment.