diff --git a/pyproject.toml b/pyproject.toml index efcaee9..7b31da0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ netrics = "netrics:main" # built-in measurement modules netrics-dev = "netrics.measurement.dev:main" netrics-dns-latency = "netrics.measurement.dns_latency:main" +netrics-hops-traceroute = "netrics.measurement.hops_traceroute:main" netrics-lml = "netrics.measurement.lml:main" netrics-lml-scamper = "netrics.measurement.lml:main" netrics-lml-traceroute = "netrics.measurement.lml_traceroute:main" diff --git a/src/netrics/conf/include/measurements.yaml b/src/netrics/conf/include/measurements.yaml index 2a06e1c..f68a1bd 100644 --- a/src/netrics/conf/include/measurements.yaml +++ b/src/netrics/conf/include/measurements.yaml @@ -1,3 +1,10 @@ +hops: + command: hops-traceroute + schedule: "H/5 * * * *" + param: + destinations: + www.google.com: google + lml: command: lml-traceroute schedule: "H/5 * * * *" diff --git a/src/netrics/measurement/hops_traceroute.py b/src/netrics/measurement/hops_traceroute.py new file mode 100644 index 0000000..4fd1b2c --- /dev/null +++ b/src/netrics/measurement/hops_traceroute.py @@ -0,0 +1,169 @@ +"""Trace number of "hops" to target destination(s)""" +import operator +import subprocess +import typing + +from schema import Optional + +from netrics import task +from netrics.util.iteration import sequence + +from .common import ( + default, + require_exec, + require_net, +) + + +# +# params schema +# +PARAMS = task.schema.extend('hops_to_target', { + # destinations: (traceroute): list of hosts + # OR mapping of hosts to their labels (for results) + Optional('destinations', + default=default.DESTINATIONS): task.schema.DestinationCollection(), + + # max_hop: (traceroute): natural number + Optional('max_hop', default='20'): task.schema.NaturalStr('max_hop'), + + # tries: (traceroute): natural number + Optional('tries', default='5'): task.schema.NaturalStr('tries'), + + # wait: (traceroute): natural number + Optional('wait', default='2'): task.schema.NaturalStr('wait'), +}) + + +@task.param.require(PARAMS) +@require_exec('traceroute') +@require_net +def main(traceroute, params): + """Trace the number of "hops" -- *i.e.* intermediary hosts -- + between the client and target destination(s). + + The local network, and then internet hosts (as configured in global + defaults), are queried first, to ensure network operation and + internet accessibility. (See: `require_net`.) + + Traceroute is then executed against all configured internet hosts + (`destinations`) in parallel, according to configured traceroute + command arguments: `max_hop`, `tries` and `wait`. + + Traceroute outputs are parsed to retrieve the hop number of each + destination, and structured results are written out according to + configuration (`result`). + + """ + # parallelize traceroutes + processes = { + destination: subprocess.Popen( + ( + traceroute, + '--max-hop', params.max_hop, + '--tries', params.tries, + '--wait', params.wait, + destination, + ), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) for destination in params.destinations + } + + # wait and collect outputs + outputs = {destination: process.communicate() for (destination, process) in processes.items()} + + # parse results + hop_results = [ + HopResult.extract( + destination, + process, + *outputs[destination], + ) + for (destination, process) in processes.items() + ] + + # check for exceptions + (successes, failures) = sequence(operator.attrgetter('hops'), hop_results) + + fail_total = len(failures) + + for (fail_count, hop_result) in enumerate(failures, 1): + process = processes[hop_result.dest] + (stdout, stderr) = outputs[hop_result.dest] + + task.log.error( + dest=hop_result.dest, + status=f'Error ({process.returncode})', + failure=f"({fail_count}/{fail_total})", + stdout=stdout, + stderr=stderr, + ) + + if not successes: + task.log.critical("no destinations succeeded") + return task.status.no_host + + # prepare results + results = {hop_result.dest: hop_result.hops for hop_result in hop_results} + + # label results + if isinstance(params.destinations, dict): + results = { + params.destinations[destination]: result + for (destination, result) in results.items() + } + + # flatten results + if params.result.flat: + results = { + f'hops_to_{destination}': result + for (destination, result) in results.items() + } + else: + results = { + destination: {'hops': result} + for (destination, result) in results.items() + } + + # write results + task.result.write(results, + label=params.result.label, + annotate=params.result.annotate) + + return task.status.success + + +class HopResult(typing.NamedTuple): + """Hop number parsed from traceroute outputs for a destination host. + + """ + dest: str + hops: typing.Optional[int] + + @classmethod + def extract(cls, dest, process, stdout, stderr): + """Construct object from traceroute result.""" + if process.returncode == 0: + try: + (*_earlier_lines, last_line) = stdout.splitlines() + + (hop_count, _line_remainder) = last_line.strip().split(' ', 1) + + hop_int = int(hop_count) + except ValueError: + # + # we have a problem! + # + # e.g.: + # + # *) stdout was empty + # *) last line did not contain spaces to split + # *) the first column value was non-numeric + # + pass + else: + return cls(dest, hop_int) + + return cls(dest, None) diff --git a/src/netrics/util/iteration.py b/src/netrics/util/iteration.py new file mode 100644 index 0000000..95748fc --- /dev/null +++ b/src/netrics/util/iteration.py @@ -0,0 +1,33 @@ +import itertools + + +def partition(predicate, iterable): + """Split `iterable` into two disjoint iterables according + to the Boolean return value of `predicate(item)` (where `item` + corresponds to each item of `iterable`). + + This is identical to the composition of + `filter(predicate, iterable)` and + `itertools.filterfalse(predicate, iterable)`, with the distinction + that `iterable` may be an exhaustable generator or iterator, but it + will not be prematurely exhausted, (thanks to `itertools.tee`). + + """ + (t1, t2) = itertools.tee(iterable) + + return ( + filter(predicate, t1), + itertools.filterfalse(predicate, t2), + ) + + +def sequence(predicate, iterable): + """Split `iterable` into two disjoint sequences. + + This function is identical to `partition`, except that sequences of + type `tuple` are returned, (in lieu of exhaustable iterators). + + See: `partition`. + + """ + return tuple(map(tuple, partition(predicate, iterable)))