Skip to content

Commit

Permalink
hops measurement: traceroute version
Browse files Browse the repository at this point in the history
Adds task `hops_traceroute` (with executable `netrics-hops-traceroute`).

This version is added to default configuration as `hops`. (An
experimental scamper version is forthcoming.)

part of #28

part of #3
  • Loading branch information
jesteria committed Mar 8, 2023
1 parent 616e105 commit 2e991de
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 0 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ netrics = "netrics:main"
# built-in measurement modules
netrics-dev = "netrics.measurement.dev:main"
netrics-dns-latency = "netrics.measurement.dns_latency:main"
netrics-hops-traceroute = "netrics.measurement.hops_traceroute:main"
netrics-lml = "netrics.measurement.lml:main"
netrics-lml-scamper = "netrics.measurement.lml:main"
netrics-lml-traceroute = "netrics.measurement.lml_traceroute:main"
Expand Down
7 changes: 7 additions & 0 deletions src/netrics/conf/include/measurements.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
hops:
command: hops-traceroute
schedule: "H/5 * * * *"
param:
destinations:
www.google.com: google

lml:
command: lml-traceroute
schedule: "H/5 * * * *"
Expand Down
169 changes: 169 additions & 0 deletions src/netrics/measurement/hops_traceroute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""Trace number of "hops" to target destination(s)"""
import operator
import subprocess
import typing

from schema import Optional

from netrics import task
from netrics.util.iteration import sequence

from .common import (
default,
require_exec,
require_net,
)


#
# params schema
#
PARAMS = task.schema.extend('hops_to_target', {
# destinations: (traceroute): list of hosts
# OR mapping of hosts to their labels (for results)
Optional('destinations',
default=default.DESTINATIONS): task.schema.DestinationCollection(),

# max_hop: (traceroute): natural number
Optional('max_hop', default='20'): task.schema.NaturalStr('max_hop'),

# tries: (traceroute): natural number
Optional('tries', default='5'): task.schema.NaturalStr('tries'),

# wait: (traceroute): natural number
Optional('wait', default='2'): task.schema.NaturalStr('wait'),
})


@task.param.require(PARAMS)
@require_exec('traceroute')
@require_net
def main(traceroute, params):
"""Trace the number of "hops" -- *i.e.* intermediary hosts --
between the client and target destination(s).
The local network, and then internet hosts (as configured in global
defaults), are queried first, to ensure network operation and
internet accessibility. (See: `require_net`.)
Traceroute is then executed against all configured internet hosts
(`destinations`) in parallel, according to configured traceroute
command arguments: `max_hop`, `tries` and `wait`.
Traceroute outputs are parsed to retrieve the hop number of each
destination, and structured results are written out according to
configuration (`result`).
"""
# parallelize traceroutes
processes = {
destination: subprocess.Popen(
(
traceroute,
'--max-hop', params.max_hop,
'--tries', params.tries,
'--wait', params.wait,
destination,
),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
) for destination in params.destinations
}

# wait and collect outputs
outputs = {destination: process.communicate() for (destination, process) in processes.items()}

# parse results
hop_results = [
HopResult.extract(
destination,
process,
*outputs[destination],
)
for (destination, process) in processes.items()
]

# check for exceptions
(successes, failures) = sequence(operator.attrgetter('hops'), hop_results)

fail_total = len(failures)

for (fail_count, hop_result) in enumerate(failures, 1):
process = processes[hop_result.dest]
(stdout, stderr) = outputs[hop_result.dest]

task.log.error(
dest=hop_result.dest,
status=f'Error ({process.returncode})',
failure=f"({fail_count}/{fail_total})",
stdout=stdout,
stderr=stderr,
)

if not successes:
task.log.critical("no destinations succeeded")
return task.status.no_host

# prepare results
results = {hop_result.dest: hop_result.hops for hop_result in hop_results}

# label results
if isinstance(params.destinations, dict):
results = {
params.destinations[destination]: result
for (destination, result) in results.items()
}

# flatten results
if params.result.flat:
results = {
f'hops_to_{destination}': result
for (destination, result) in results.items()
}
else:
results = {
destination: {'hops': result}
for (destination, result) in results.items()
}

# write results
task.result.write(results,
label=params.result.label,
annotate=params.result.annotate)

return task.status.success


class HopResult(typing.NamedTuple):
"""Hop number parsed from traceroute outputs for a destination host.
"""
dest: str
hops: typing.Optional[int]

@classmethod
def extract(cls, dest, process, stdout, stderr):
"""Construct object from traceroute result."""
if process.returncode == 0:
try:
(*_earlier_lines, last_line) = stdout.splitlines()

(hop_count, _line_remainder) = last_line.strip().split(' ', 1)

hop_int = int(hop_count)
except ValueError:
#
# we have a problem!
#
# e.g.:
#
# *) stdout was empty
# *) last line did not contain spaces to split
# *) the first column value was non-numeric
#
pass
else:
return cls(dest, hop_int)

return cls(dest, None)
33 changes: 33 additions & 0 deletions src/netrics/util/iteration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import itertools


def partition(predicate, iterable):
"""Split `iterable` into two disjoint iterables according
to the Boolean return value of `predicate(item)` (where `item`
corresponds to each item of `iterable`).
This is identical to the composition of
`filter(predicate, iterable)` and
`itertools.filterfalse(predicate, iterable)`, with the distinction
that `iterable` may be an exhaustable generator or iterator, but it
will not be prematurely exhausted, (thanks to `itertools.tee`).
"""
(t1, t2) = itertools.tee(iterable)

return (
filter(predicate, t1),
itertools.filterfalse(predicate, t2),
)


def sequence(predicate, iterable):
"""Split `iterable` into two disjoint sequences.
This function is identical to `partition`, except that sequences of
type `tuple` are returned, (in lieu of exhaustable iterators).
See: `partition`.
"""
return tuple(map(tuple, partition(predicate, iterable)))

0 comments on commit 2e991de

Please sign in to comment.