diff --git a/pyproject.toml b/pyproject.toml index d834504..89924fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ packages = [{include = "netrics", from = "src"}] [tool.poetry.dependencies] python = "^3.8" -fate-scheduler = "0.1.0-rc.6" +fate-scheduler = "0.1.0-rc.7" netifaces = "^0.11.0" [tool.poetry.dev-dependencies] @@ -25,6 +25,8 @@ netrics = "netrics:main" # built-in measurement modules netrics-dev = "netrics.measurement.dev:main" netrics-dns-latency = "netrics.measurement.dns_latency:main" +netrics-lml = "netrics.measurement.lml:main" +netrics-lml-scamper = "netrics.measurement.lml:main" netrics-ndt7 = "netrics.measurement.ndt7:main" netrics-ookla = "netrics.measurement.ookla:main" netrics-ping = "netrics.measurement.ping:main" diff --git a/src/netrics/conf/include/measurements.yaml b/src/netrics/conf/include/measurements.yaml index f5e343a..5d47515 100644 --- a/src/netrics/conf/include/measurements.yaml +++ b/src/netrics/conf/include/measurements.yaml @@ -1,5 +1,5 @@ ping: - schedule: "*/5 * * * *" + schedule: "H/5 * * * *" param: destinations: # network locator: results label @@ -23,3 +23,6 @@ ping: 77.67.119.129: Paris 195.89.146.193: Stockholm 190.98.158.1: Sao_Paulo + +# lml-scamper: +# schedule: "H/5 * * * *" diff --git a/src/netrics/measurement/builtin/netrics-lml.py b/src/netrics/measurement/builtin/netrics-lml.py deleted file mode 100644 index 4138ad7..0000000 --- a/src/netrics/measurement/builtin/netrics-lml.py +++ /dev/null @@ -1,233 +0,0 @@ -from json.decoder import JSONDecodeError -import subprocess as sp -import json -import sys -import types -import ipaddress -from shutil import which - -# Global error codes -SUCCESS = 0 -CONFIG_ERROR = 20 -BIN_ERROR = 21 - -# Dig error codes -USAGE_ERROR = 1 -BATCH_FILE = 8 -NO_REPLY = 9 -INTERNAL_ERROR = 10 - -# Scamper error codes -SCAMPER_CONFIG_ERROR = 255 - -# Default input parameters -PARAM_DEFAULTS = {'target': '8.8.8.8', - 'attempts': 3, - 'timeout': 5, - 'verbose': 0} - -SCAMPER_BIN = "scamper" - - -def is_executable(name): - """ - Checks whether `name` is on PATH and marked as executable - """ - if which(name) is None: - return BIN_ERROR - return SUCCESS - - -def stdin_parser(): - """ - Verifies the type of the input parameters - - Returns: - params: A dict containing the input parameters. - exit_code: Exit code, 20 if unexpected type - """ - - # Read config from stdin and fill omitted params with default - params = dict(PARAM_DEFAULTS, **json.load(sys.stdin)) - exit_code = SUCCESS - - # Check type of parameter - try: - params['attempts'] = str(int(params['attempts'])) - params['timeout'] = str(int(params['timeout'])) - except ValueError: - exit_code = CONFIG_ERROR - if str(params['verbose']).lower() in ['true', '1']: - params['verbose'] = True - elif str(params['verbose']).lower() in ['false', '0']: - params['verbose'] = False - else: - exit_code = CONFIG_ERROR - - return params, exit_code - - -def parse_lml(out): - """ - Parses traceroute output and returns last mile info - """ - - res = {} - - for line in out: - try: - record = json.loads(line) - except ValueError: - continue - - if record.get('type') != 'trace': - continue - - res['src'] = record['src'] - res['dst'] = record['dst'] - res['attempts'] = record['attempts'] - - for i in range(record['probe_count']): - hop = record['hops'][i] - - # Check to see if we have ID'ed last mile hop IP addr - if 'last_mile_ip' in res: - if hop['addr'] != res['last_mile_ip']: - break - else: - res['rtts'].append(hop['rtt']) - - # Otherwise, see if this is last mile hop - elif not ipaddress.ip_address(hop['addr']).is_private: - res['last_mile_ip'] = hop['addr'] - res['rtts'] = [hop['rtt']] - - return res - - -def parse_scamper_stderr(exit_code, verbose, stderr): - """ - Handles exit code and returns correct error message - """ - - if exit_code == SUCCESS: - return {'retcode': exit_code, - 'message': 'Success'} if verbose else None - elif exit_code == SCAMPER_CONFIG_ERROR: - return {'retcode': exit_code, 'message': 'Scamper misconfigured'} - elif exit_code > 0: - return {'retcode': exit_code, 'message': stderr} - - else: - return None - - -def parse_dig_stderr(exit_code, verbose, stderr): - """ - Parse dig exit code and return interpretable error. Error - messages based on Dig man page. - Attributes: - exit_code: The return code from the dig command. - verbose: Module parameter to indicate verbose output. - stderr: Stderr returned by dig. - """ - - if exit_code == SUCCESS: - return {'retcode': exit_code, - 'message': 'Success'} if verbose else None - - elif exit_code == USAGE_ERROR: - return {'retcode': exit_code, 'message': 'Usage Error'} - elif exit_code == BATCH_FILE: - return {'retcode': exit_code, 'message': "Couldn't open batch file"} - elif exit_code == NO_REPLY: - return {'retcode': exit_code, 'message': "No reply from server"} - elif exit_code == INTERNAL_ERROR: - return {'retcode': exit_code, 'message': "Internal error"} - elif exit_code > 0: - return {'retcode': exit_code, 'message': stderr} - - else: - return None - - -def get_ip(hostname): - """ - Perform DNS query on hostname, return first IP - """ - - cmd = ['dig', '+short', hostname] - - try: - res = sp.run(cmd, capture_output=True, check=True) - except sp.CalledProcessError as err: - return err.returncode, err.stderr - - ipaddr = res.stdout.decode('utf-8').split('\n')[0] - return res.returncode, ipaddr - - -def main(): - - # Initialized stored structs - stdout_res = {} - stderr_res = {} - exit_code = SUCCESS - - # Check that scamper is executable and on PATH - if not is_executable(SCAMPER_BIN): - stderr_res['bin'] = {'retcode': exit_code, - 'message': 'Scamper either not on PATH or not executable'} - json.dump(stderr_res, sys.stderr) - sys.exit(exit_code) - - # Parse stdin - params, exit_code = stdin_parser() - if exit_code != SUCCESS: - stderr_res['stdin'] = {'retcode': exit_code, - 'message': 'Config param types error'} - json.dump(stderr_res, sys.stderr) - sys.exit(exit_code) - - # Resolve target if given as hostname - try: - ipaddress.ip_address(params['target']) - except ValueError: - recode, target_ip = get_ip(params['target']) - if stderr_dst := parse_dig_stderr(recode, params['verbose'], target_ip): - if "dig" not in stderr_res: - stderr_res['dig'] = {} - stderr_res['dig'][params['target']] = stderr_dst - - cmd = ( - SCAMPER_BIN, - '-O', 'json', - '-i, target_ip, - '-c', f'trace -P icmp-paris -q {params["attempts"]} -w {params["timeout"]} -Q', - ) - - # Run scamper traceroute - try: - lml_res = sp.run(cmd, capture_output=True, text=True, check=True) - output = lml_res.stdout.splitlines() - stdout_res = parse_lml(output) - if error := parse_scamper_stderr(lml_res.returncode, - params['verbose'], - lml_res.stderr.decode('utf-8')): - stderr_res['trace'] = error - except sp.CalledProcessError as err: - stderr_res['trace'] = parse_scamper_stderr(err.returncode, - params['verbose'], - err.stderr.decode('utf-8')) - exit_code = err.returncode - - # Communicate stdout, stderr, exit code - if stdout_res: - json.dump(stdout_res, sys.stdout) - if stderr_res: - json.dump(stderr_res, sys.stderr) - sys.exit(exit_code) - - -if __name__ == '__main__': - main() diff --git a/src/netrics/measurement/lml.py b/src/netrics/measurement/lml.py new file mode 100644 index 0000000..05b53fd --- /dev/null +++ b/src/netrics/measurement/lml.py @@ -0,0 +1,232 @@ +"""Measure latency to the "last mile" host via scamper.""" +import json +import random +import shutil +import statistics +import subprocess +from ipaddress import ip_address +from itertools import groupby + +from schema import Optional + +from netrics import task + +from .common import AddressLookups, require_net + + +# +# params schema +# +PARAMS = task.schema.extend('last_mile_rtt', { + # destinations: (scamper): list of hosts (IP address preferred!) + # OR mapping of hosts to their labels (for results) + # (Note: will select ONE) + Optional('destinations', + default={'8.8.8.8': 'Google_DNS', + '1.1.1.1': 'Cloudflare_DNS'}): task.schema.DestinationCollection(), + + # attempts: (scamper): natural number + Optional('attempts', default='3'): task.schema.NaturalStr('attempts'), + + # timeout: (scamper): positive integer seconds + Optional('timeout', default='5'): task.schema.PositiveIntStr('timeout', 'seconds'), + + # include: mapping + Optional('include', + default={'last_mile_ip': False, + 'source_ip': False}): { + # last_mile_ip: include detected IP address of last mile host in results + Optional('last_mile_ip', default=False): bool, + + # source_ip: include device LAN IP address in results + Optional('source_ip', default=False): bool, + }, +}) + + +@task.param.require(PARAMS) +@require_net +def main(params): + """Measure latency to the "last mile" host via scamper. + + The local network, and then internet hosts (as configured in global + defaults), are queried first, to ensure network operation and + internet accessibility. (See: `require_net`.) + + A scamper trace is then executed against a configured internet host. + (A domain name *may* be configured in lieu of an IP address; the + name will first be resolved to its IP.) + + Multiple host targets *may* be specified, in which case the target + to trace is randomly selected; and, in the case of failure, + additional targets will be selected, sequentially. + + The "last mile" host -- *i.e.* the first "hop" outside of the + client's private network -- is identified from scamper trace + results, and this host's "round-trip time" (RTT) to respond is + parsed and written out according to configuration. + + """ + # ensure scamper on PATH + scamper_path = shutil.which('scamper') + if scamper_path is None: + task.log.critical("scamper executable not found") + return task.status.file_missing + + # resolve destination(s) given by domain to IP + address_lookups = AddressLookups(params.destinations) + + for hostname in address_lookups.unresolved: + task.log.error(host=hostname, + status=address_lookups.queries[hostname].returncode, + msg='domain look-up failure') + + if not address_lookups.resolved: + task.log.critical(errors=len(address_lookups.unresolved), + msg='no addresses to query') + return task.status.no_host + + # randomize target from resolved destination(s) + target_ips = list(address_lookups.resolved) + random.shuffle(target_ips) + + # try target(s) falling back sequentially + trace_cmd = f'trace -Q -P icmp-paris -q {params.attempts} -w {params.timeout}' + + for target_ip in target_ips: + try: + process = subprocess.run( + ( + scamper_path, + '-O', 'json', + '-c', trace_cmd, + '-i', target_ip, + ), + capture_output=True, + check=True, + text=True, + ) + except subprocess.CalledProcessError as exc: + # scamper shouldn't really error this way: this is serious + task.log.critical( + dest=target_ip, + status=f'Error ({exc.returncode})', + args=exc.cmd, + stdout=exc.stdout, + stderr=exc.stderr, + ) + return task.status.software_error + + # + # retrieve any returned results + # + # scamper returns results for each target given; we've given + # one target and expect to receive one results set. + # + # however, scamper *could* fail to return proper and/or desired + # results: here, evaluating either to *no* results set (length + # zero) OR to a results set equal to None. we'll treat either + # case the same. + # + try: + (results,) = parse_output(process.stdout) + except ValueError: + results = None + + if results is not None: + # we got what we wanted! we're done. + break + + # + # query against this target failed... + # + # log and continue to next target (if any) + # + task.log.error( + dest=target_ip, + stdout=process.stdout, + stderr=process.stderr, + msg='no result identified', + ) + else: + # no queries succeeded! + task.log.critical( + dests=target_ips, + status='Error', + msg='all queries failed', + ) + return task.status.no_host + + # write results + if not params.include.last_mile_ip: + del results['last_mile_tr_addr'] + + if not params.include.source_ip: + del results['last_mile_tr_src'] + + target_ip = results.pop('last_mile_tr_dst') # preserved from loop but for clarity + + (target_host, *extra_names) = address_lookups.getkeys(target_ip) + + if extra_names: + task.log.warning(dest=target_ip, + msg='destination given by multiple hostnames') + + if isinstance(params.destinations, dict): + target_label = params.destinations[target_host] + else: + target_label = target_host + + if params.result.flat: + del results['last_mile_tr_rtt_ms'] + + results = {f'{target_label}_{key}': value + for (key, value) in results.items()} + else: + results = {target_label: results} + + task.result.write(results, + label=params.result.label, + annotate=params.result.annotate) + + return task.status.success + + +def parse_output(output): + """Parse scamper output to return last mile info.""" + records = (json.loads(line) for line in output.splitlines()) + + return [ + prepare_result(record) + for record in records + if record['type'] == 'trace' + ] + + +def prepare_result(record): + """Construct last mile result from scamper trace record.""" + if record['stop_reason'] != 'COMPLETED': + task.log.warning(dest=record['dst'], + count=record['probe_count'], + stop_reason=record['stop_reason']) + + hop_groups = groupby(record['hops'], lambda hop: hop['addr']) + + for (addr, trials) in hop_groups: + if not ip_address(addr).is_private: + # the first non-private "hop" is the "last mile" + rtts = [trial['rtt'] for trial in trials] + + return { + 'last_mile_tr_dst': record['dst'], + 'last_mile_tr_src': record['src'], + 'last_mile_tr_addr': addr, + 'last_mile_tr_rtt_ms': rtts, + 'last_mile_tr_rtt_max_ms': max(rtts), + 'last_mile_tr_rtt_min_ms': min(rtts), + 'last_mile_tr_rtt_median_ms': statistics.median(rtts), + 'last_mile_tr_rtt_mdev_ms': round(statistics.stdev(rtts), 3), + } + + # no last-mile/WAN data found + return None diff --git a/test/param/lml.json b/test/param/lml.json deleted file mode 100644 index 681c2e8..0000000 --- a/test/param/lml.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "target": "8.8.8.8" -} diff --git a/test/param/ping.json b/test/param/ping.json deleted file mode 100644 index d05928d..0000000 --- a/test/param/ping.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "targets": [ - "google.com", - "netflix.com", - "nytimes.com", - "facebook.com", - "reddit.com", - "github.com", - "linkedin.com" - ] -} diff --git a/test/param/traceroute.json b/test/param/traceroute.json deleted file mode 100644 index 36c09b1..0000000 --- a/test/param/traceroute.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "targets": [ - "1.1.1.1", - "nytimes.com" - ] -}