From c71d0d3c8401eb6e1b1d6f34fe87f4705c3197d0 Mon Sep 17 00:00:00 2001 From: Jesse London Date: Tue, 24 Jan 2023 15:33:00 -0600 Subject: [PATCH] Polishes up built-in ping latency measurement for near parity with existing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …Plus additional functionality: * complete measurement configurability (via fate/netrics measurements file) * validation of structured configuration via schema * pre-measurement checks of execution environment (localhost) and LAN (gateway) * parallelization of internet ping requests against configured targets Future work: * "near parity" is defined by the deferral of the command-debugging zip archive to future work Issue tracking: * part of #3: "built-in measurements" * resolves #12: "…common/global folder for stdlib netrics tests" * resolves #17: "netrics-ping returns errors if one address of the list is unreachable…" --- pyproject.toml | 2 +- src/netrics/__init__.py | 2 +- src/netrics/errno.py | 3 - src/netrics/measurement/ping.py | 345 ++++++++++++++++++-------------- src/netrics/task/__init__.py | 8 + src/netrics/task/result.py | 27 +++ src/netrics/task/sysexit.py | 12 ++ 7 files changed, 247 insertions(+), 152 deletions(-) delete mode 100644 src/netrics/errno.py create mode 100644 src/netrics/task/__init__.py create mode 100644 src/netrics/task/result.py create mode 100644 src/netrics/task/sysexit.py diff --git a/pyproject.toml b/pyproject.toml index 16f3d26..aa8399c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ packages = [{include = "netrics", from = "src"}] [tool.poetry.dependencies] python = "^3.8" -fate-scheduler = "0.1.0-rc.2" +fate-scheduler = "0.1.0-rc.3" netifaces = "^0.11.0" [tool.poetry.dev-dependencies] diff --git a/src/netrics/__init__.py b/src/netrics/__init__.py index 51bf111..88182ef 100644 --- a/src/netrics/__init__.py +++ b/src/netrics/__init__.py @@ -1 +1 @@ -from .__main__ import main, daemon, serve # noqa: F401 +from .__main__ import conf, daemon, main, serve # noqa: F401 diff --git a/src/netrics/errno.py b/src/netrics/errno.py deleted file mode 100644 index e5fea19..0000000 --- a/src/netrics/errno.py +++ /dev/null @@ -1,3 +0,0 @@ -"""Global error codes""" - -CONFIG_ERROR = 78 diff --git a/src/netrics/measurement/ping.py b/src/netrics/measurement/ping.py index bb1e68c..e377e2f 100644 --- a/src/netrics/measurement/ping.py +++ b/src/netrics/measurement/ping.py @@ -1,91 +1,204 @@ -import subprocess -import json +"""Measure ping latency to configured hosts.""" import re -import sys -import types - -from netrics import errno - - -# Local error codes -SUCCESS = 0 -NO_REPLY = 1 -LAN_ERROR = 2 - -# Default parameters -PARAM_DEFAULTS = { - "targets": [ - "google.com", - "facebook.com", - "nytimes.com", - ], - "count": 10, - "interval": 0.25, - "timeout": 5, - "verbose": False, +import subprocess +from collections import defaultdict +from numbers import Real + +from schema import ( + And, + Or, + Optional, + Use, + SchemaError, +) + +from netrics import task + +from .common import require_lan + + +# +# ping exit codes +# +# if ping returns any code other than the below something is *very* wrong +# +# (the error code 2 is included -- unclear if ping *can* return anything higher than that.) +# +PING_CODES = { + 0, # success + 1, # no reply + 2, # error (e.g. dns) } -CONFIG_MESSAGE = "Parameter type error (count and timeout must be of type int)" - - -def get_params(): - """Ensure the type of the input parameters. - - Returns - ------- - params: A dict containing input parameters - - Raises - ------ - ValueError - """ - # Read params from stdin with defaults - params = PARAM_DEFAULTS.copy() - if input_ := sys.stdin.read(): - params.update(json.loads(input_)) +# +# params +# +# input -- a (deserialized) mapping -- is entirely optional. +# +# a dict, of the optional param keys, their defaults, and validations of +# their values, is given below. +# + +Text = And(str, len) # non-empty str + +PARAM_SCHEMA = { + # destinations: (ping): list of hosts + # OR mapping of hosts to their labels (for results) + Optional('destinations', + default=('google.com', + 'facebook.com', + 'nytimes.com')): Or({Text: Text}, + And([Text], + lambda dests: len(dests) == len(set(dests))), + error="destinations: must be non-repeating list " + "of network locators or mapping of these " + "to their result labels"), + + # count: (ping): natural number + Optional('count', default='10'): And(int, + lambda count: count > 0, + Use(str), + error="count: int must be greater than 0"), + + # interval: (ping): int/decimal seconds no less than 2ms + Optional('interval', default='0.25'): And(Real, + lambda interval: interval >= 0.002, + Use(str), + error="interval: seconds must not be less than 2ms"), + + # deadline: (ping): positive integer seconds + Optional('deadline', default='5'): And(int, + lambda deadline: deadline >= 0, + Use(str), + error="deadline: int seconds must not be less than 0"), + + # result: mappping + Optional('result', default={'flat': True, + 'label': 'ping_latency', + 'meta': True}): { + # flat: flatten ping destination results dict to one level + Optional('flat', default=True): bool, + + # wrap: wrap the above (whether flat or not) in a measurement label + Optional('label', default='ping_latency'): Or(False, None, Text), + + # meta: wrap all of the above (whatever it is) with metadata (time, etc.) + Optional('meta', default=True): bool, + }, +} - # Check type of parameters (count and timeout must be int) - params['interval'] = str(float(params['interval'])) - params['count'] = str(int(params['count'])) - params['timeout'] = str(int(params['timeout'])) - return types.SimpleNamespace(**params) +@require_lan +def main(): + """Measure ping latency to configured hosts. + The local network is queried first to ensure operation. + (See: `require_lan`.) -def result_log(returncode, stderr, verbose): - """Construct log message for given result. + Ping queries are then executed, in parallel, to each configured host + (`destinations`) according to configured ping command arguments: + `count`, `interval` and `deadline`. - Arguments - --------- - returncode: The return code from the ping command. - stderr: Stderr returned by ping. - verbose: Module parameter to indicate verbose output. + Ping outputs are parsed into structured results and written out + according to configuration (`result`). """ - if returncode == SUCCESS: - return {'retcode': returncode, 'message': "Success"} if verbose else None - - if returncode == NO_REPLY: - return {'retcode': returncode, - 'message': "Transmission successful, some packet loss"} if verbose else None - - if returncode == LAN_ERROR: - return {'retcode': returncode, "message": "Local network error"} - - return {'retcode': returncode, 'message': stderr} - - -def parse_result(output): - """Parse ping output and returns dict with results.""" - - # Extract packet loss stats - pkt_loss_match = re.search(r', ([0-9.]*)% packet loss', output, re.MULTILINE) - - if pkt_loss_match: - pkt_loss = float(pkt_loss_match.group(1)) - else: - pkt_loss = -1.0 + # read input params + try: + params = task.param.read(schema=PARAM_SCHEMA) + except SchemaError as exc: + task.log.critical(error=str(exc), msg="input error") + return task.status.conf_error + + # parallelize pings + processes = { + destination: subprocess.Popen( + ( + 'ping', + '-c', params.count, + '-i', params.interval, + '-w', params.deadline, + destination, + ), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) for destination in params.destinations + } + + # wait and collect outputs + outputs = {destination: process.communicate() for (destination, process) in processes.items()} + + # check for exceptions + failures = [ + (destination, process, outputs[destination]) + for (destination, process) in processes.items() + if process.returncode not in PING_CODES + ] + + if failures: + total_failures = len(failures) + + # directly log first 3 failures + for (fail_count, (destination, process, (stdout, stderr))) in enumerate(failures[:3], 1): + task.log.critical( + dest=destination, + status=f'Error ({process.returncode})', + failure=f"({fail_count}/{total_failures})", + args=process.args[:-1], + stdout=stdout, + stderr=stderr, + ) + + if fail_count < total_failures: + task.log.critical( + dest='...', + status='Error (...)', + failure=f"(.../{total_failures})", + args='...', + stdout='...', + stderr='...', + ) + + return task.status.software_error + + # log summary/general results + statuses = defaultdict(int) + for process in processes.values(): + statuses[process.returncode] += 1 + + task.log.info({'dest-status': statuses}) + + # parse detailed results + results = { + destination: parse_output(stdout) + for (destination, (stdout, _stderr)) in outputs.items() + } + + # label results + if isinstance(params.destinations, dict): + results = { + params.destinations[destination]: result + for (destination, result) in results.items() + } + + # flatten results + if params.result.flat: + results = {f'{label}_{feature}': value + for (label, data) in results.items() + for (feature, value) in data.items()} + + # write results + task.result.write(results, + label=params.result.label, + meta=params.result.meta) + + return task.status.success + + +def parse_output(output): + """Parse ping output and return dict of results.""" # Extract RTT stats rtt_match = re.search( @@ -93,78 +206,16 @@ def parse_result(output): output ) - if rtt_match: - rtt_values = [float(value) for value in rtt_match.groups()] - else: - rtt_values = [-1.0] * 4 - - rtt_keys = ('rtt_min', 'rtt_avg', 'rtt_max', 'rtt_stddev') - - rtt_stats = dict(zip(rtt_keys, rtt_values)) - - return dict(rtt_stats, pkt_loss=pkt_loss) - - -def main(): - # Parse stdin params - try: - params = get_params() - except ValueError: - json.dump({'error': CONFIG_MESSAGE}, sys.stderr) - sys.exit(errno.CONFIG_ERROR) - - # Launch pings - procs = [] - for dst in params.targets: - args = ( - 'ping', - '-i', params.interval, - '-c', params.count, - '-w', params.timeout, - dst, - ) - - proc = subprocess.Popen( - args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - ) + rtt_values = [float(value) for value in rtt_match.groups()] if rtt_match else [-1.0] * 4 - procs.append((dst, proc)) + rtt_keys = ('rtt_min_ms', 'rtt_avg_ms', 'rtt_max_ms', 'rtt_mdev_ms') - # Process results - dst_code = SUCCESS - log = {} - result = {} + rtt_stats = zip(rtt_keys, rtt_values) - for (dst, proc) in procs: - proc.wait() - - # We'll report the "worst" exit code as our own - dst_code = max(dst_code, proc.returncode) - - # Parse ping exit code and write to log if message or error - if dst_log := result_log(proc.returncode, proc.stderr.read(), params.verbose): - log[dst] = dst_log - - # If LAN error, don't expect a result - if proc.returncode >= LAN_ERROR: - continue - - result[dst] = parse_result(proc.stdout.read()) - - exit_code = dst_code if dst_code > NO_REPLY else 0 - - # Write out logs and results - if exit_code == 0: - json.dump(result, sys.stdout) - - if log: - json.dump(log, sys.stderr) - - sys.exit(exit_code) + # Extract packet loss stats + pkt_loss_match = re.search(r', ([0-9.]*)% packet loss', output, re.MULTILINE) + pkt_loss = float(pkt_loss_match.group(1)) if pkt_loss_match else -1.0 -if __name__ == '__main__': - main() + # Return combined dict + return dict(rtt_stats, packet_loss_pct=pkt_loss) diff --git a/src/netrics/task/__init__.py b/src/netrics/task/__init__.py new file mode 100644 index 0000000..a0bc0df --- /dev/null +++ b/src/netrics/task/__init__.py @@ -0,0 +1,8 @@ +from fate.task import ( # noqa: F401 + log, + param, +) + +from . import result # noqa: F401 + +from .sysexit import status # noqa: F401 diff --git a/src/netrics/task/result.py b/src/netrics/task/result.py new file mode 100644 index 0000000..a112286 --- /dev/null +++ b/src/netrics/task/result.py @@ -0,0 +1,27 @@ +"""Task result recording compatible with the Fate scheduler.""" +import time + +import fate.task.result + + +def write(results, /, label=None, meta=True, **kwargs): + """Write task results. + + Wraps results in metadata, by default, according to `meta=True`; + and, places results under the key `label`, if provided. + + See `fate.task.result.write` for further details. + + """ + if label: + results = {label: results} + + if meta: + results = { + 'Measurements': results, + 'Meta': { + 'Time': time.time(), + }, + } + + return fate.task.result.write(results, **kwargs) diff --git a/src/netrics/task/sysexit.py b/src/netrics/task/sysexit.py new file mode 100644 index 0000000..ff6a61e --- /dev/null +++ b/src/netrics/task/sysexit.py @@ -0,0 +1,12 @@ +"""Common exit codes""" +import enum + + +class status(enum.IntEnum): + + success = 0 + no_host = 68 + software_error = 70 + os_error = 71 + file_missing = 72 + conf_error = 78