Skip to content

Commit

Permalink
Polishes up built-in ping latency measurement for near parity with ex…
Browse files Browse the repository at this point in the history
…isting

…Plus additional functionality:

* complete measurement configurability (via fate/netrics measurements file)

* validation of structured configuration via schema

* pre-measurement checks of execution environment (localhost) and LAN
  (gateway)

* parallelization of internet ping requests against configured targets

Future work:

* "near parity" is defined by the deferral of the command-debugging zip
  archive to future work

Issue tracking:

* part of #3: "built-in measurements"

* resolves #12: "…common/global folder for stdlib netrics tests"

* resolves #17: "netrics-ping returns errors if one address of the list is unreachable…"
  • Loading branch information
jesteria committed Jan 24, 2023
1 parent 8a27743 commit c71d0d3
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 152 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ packages = [{include = "netrics", from = "src"}]

[tool.poetry.dependencies]
python = "^3.8"
fate-scheduler = "0.1.0-rc.2"
fate-scheduler = "0.1.0-rc.3"
netifaces = "^0.11.0"

[tool.poetry.dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion src/netrics/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .__main__ import main, daemon, serve # noqa: F401
from .__main__ import conf, daemon, main, serve # noqa: F401
3 changes: 0 additions & 3 deletions src/netrics/errno.py

This file was deleted.

345 changes: 198 additions & 147 deletions src/netrics/measurement/ping.py
Original file line number Diff line number Diff line change
@@ -1,170 +1,221 @@
import subprocess
import json
"""Measure ping latency to configured hosts."""
import re
import sys
import types

from netrics import errno


# Local error codes
SUCCESS = 0
NO_REPLY = 1
LAN_ERROR = 2

# Default parameters
PARAM_DEFAULTS = {
"targets": [
"google.com",
"facebook.com",
"nytimes.com",
],
"count": 10,
"interval": 0.25,
"timeout": 5,
"verbose": False,
import subprocess
from collections import defaultdict
from numbers import Real

from schema import (
And,
Or,
Optional,
Use,
SchemaError,
)

from netrics import task

from .common import require_lan


#
# ping exit codes
#
# if ping returns any code other than the below something is *very* wrong
#
# (the error code 2 is included -- unclear if ping *can* return anything higher than that.)
#
PING_CODES = {
0, # success
1, # no reply
2, # error (e.g. dns)
}

CONFIG_MESSAGE = "Parameter type error (count and timeout must be of type int)"


def get_params():
"""Ensure the type of the input parameters.
Returns
-------
params: A dict containing input parameters
Raises
------
ValueError

"""
# Read params from stdin with defaults
params = PARAM_DEFAULTS.copy()
if input_ := sys.stdin.read():
params.update(json.loads(input_))
#
# params
#
# input -- a (deserialized) mapping -- is entirely optional.
#
# a dict, of the optional param keys, their defaults, and validations of
# their values, is given below.
#

Text = And(str, len) # non-empty str

PARAM_SCHEMA = {
# destinations: (ping): list of hosts
# OR mapping of hosts to their labels (for results)
Optional('destinations',
default=('google.com',
'facebook.com',
'nytimes.com')): Or({Text: Text},
And([Text],
lambda dests: len(dests) == len(set(dests))),
error="destinations: must be non-repeating list "
"of network locators or mapping of these "
"to their result labels"),

# count: (ping): natural number
Optional('count', default='10'): And(int,
lambda count: count > 0,
Use(str),
error="count: int must be greater than 0"),

# interval: (ping): int/decimal seconds no less than 2ms
Optional('interval', default='0.25'): And(Real,
lambda interval: interval >= 0.002,
Use(str),
error="interval: seconds must not be less than 2ms"),

# deadline: (ping): positive integer seconds
Optional('deadline', default='5'): And(int,
lambda deadline: deadline >= 0,
Use(str),
error="deadline: int seconds must not be less than 0"),

# result: mappping
Optional('result', default={'flat': True,
'label': 'ping_latency',
'meta': True}): {
# flat: flatten ping destination results dict to one level
Optional('flat', default=True): bool,

# wrap: wrap the above (whether flat or not) in a measurement label
Optional('label', default='ping_latency'): Or(False, None, Text),

# meta: wrap all of the above (whatever it is) with metadata (time, etc.)
Optional('meta', default=True): bool,
},
}

# Check type of parameters (count and timeout must be int)
params['interval'] = str(float(params['interval']))
params['count'] = str(int(params['count']))
params['timeout'] = str(int(params['timeout']))

return types.SimpleNamespace(**params)
@require_lan
def main():
"""Measure ping latency to configured hosts.
The local network is queried first to ensure operation.
(See: `require_lan`.)
def result_log(returncode, stderr, verbose):
"""Construct log message for given result.
Ping queries are then executed, in parallel, to each configured host
(`destinations`) according to configured ping command arguments:
`count`, `interval` and `deadline`.
Arguments
---------
returncode: The return code from the ping command.
stderr: Stderr returned by ping.
verbose: Module parameter to indicate verbose output.
Ping outputs are parsed into structured results and written out
according to configuration (`result`).
"""
if returncode == SUCCESS:
return {'retcode': returncode, 'message': "Success"} if verbose else None

if returncode == NO_REPLY:
return {'retcode': returncode,
'message': "Transmission successful, some packet loss"} if verbose else None

if returncode == LAN_ERROR:
return {'retcode': returncode, "message": "Local network error"}

return {'retcode': returncode, 'message': stderr}


def parse_result(output):
"""Parse ping output and returns dict with results."""

# Extract packet loss stats
pkt_loss_match = re.search(r', ([0-9.]*)% packet loss', output, re.MULTILINE)

if pkt_loss_match:
pkt_loss = float(pkt_loss_match.group(1))
else:
pkt_loss = -1.0
# read input params
try:
params = task.param.read(schema=PARAM_SCHEMA)
except SchemaError as exc:
task.log.critical(error=str(exc), msg="input error")
return task.status.conf_error

# parallelize pings
processes = {
destination: subprocess.Popen(
(
'ping',
'-c', params.count,
'-i', params.interval,
'-w', params.deadline,
destination,
),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
) for destination in params.destinations
}

# wait and collect outputs
outputs = {destination: process.communicate() for (destination, process) in processes.items()}

# check for exceptions
failures = [
(destination, process, outputs[destination])
for (destination, process) in processes.items()
if process.returncode not in PING_CODES
]

if failures:
total_failures = len(failures)

# directly log first 3 failures
for (fail_count, (destination, process, (stdout, stderr))) in enumerate(failures[:3], 1):
task.log.critical(
dest=destination,
status=f'Error ({process.returncode})',
failure=f"({fail_count}/{total_failures})",
args=process.args[:-1],
stdout=stdout,
stderr=stderr,
)

if fail_count < total_failures:
task.log.critical(
dest='...',
status='Error (...)',
failure=f"(.../{total_failures})",
args='...',
stdout='...',
stderr='...',
)

return task.status.software_error

# log summary/general results
statuses = defaultdict(int)
for process in processes.values():
statuses[process.returncode] += 1

task.log.info({'dest-status': statuses})

# parse detailed results
results = {
destination: parse_output(stdout)
for (destination, (stdout, _stderr)) in outputs.items()
}

# label results
if isinstance(params.destinations, dict):
results = {
params.destinations[destination]: result
for (destination, result) in results.items()
}

# flatten results
if params.result.flat:
results = {f'{label}_{feature}': value
for (label, data) in results.items()
for (feature, value) in data.items()}

# write results
task.result.write(results,
label=params.result.label,
meta=params.result.meta)

return task.status.success


def parse_output(output):
"""Parse ping output and return dict of results."""

# Extract RTT stats
rtt_match = re.search(
r'rtt [a-z/]* = ([0-9.]*)/([0-9.]*)/([0-9.]*)/([0-9.]*) ms',
output
)

if rtt_match:
rtt_values = [float(value) for value in rtt_match.groups()]
else:
rtt_values = [-1.0] * 4

rtt_keys = ('rtt_min', 'rtt_avg', 'rtt_max', 'rtt_stddev')

rtt_stats = dict(zip(rtt_keys, rtt_values))

return dict(rtt_stats, pkt_loss=pkt_loss)


def main():
# Parse stdin params
try:
params = get_params()
except ValueError:
json.dump({'error': CONFIG_MESSAGE}, sys.stderr)
sys.exit(errno.CONFIG_ERROR)

# Launch pings
procs = []
for dst in params.targets:
args = (
'ping',
'-i', params.interval,
'-c', params.count,
'-w', params.timeout,
dst,
)

proc = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
rtt_values = [float(value) for value in rtt_match.groups()] if rtt_match else [-1.0] * 4

procs.append((dst, proc))
rtt_keys = ('rtt_min_ms', 'rtt_avg_ms', 'rtt_max_ms', 'rtt_mdev_ms')

# Process results
dst_code = SUCCESS
log = {}
result = {}
rtt_stats = zip(rtt_keys, rtt_values)

for (dst, proc) in procs:
proc.wait()

# We'll report the "worst" exit code as our own
dst_code = max(dst_code, proc.returncode)

# Parse ping exit code and write to log if message or error
if dst_log := result_log(proc.returncode, proc.stderr.read(), params.verbose):
log[dst] = dst_log

# If LAN error, don't expect a result
if proc.returncode >= LAN_ERROR:
continue

result[dst] = parse_result(proc.stdout.read())

exit_code = dst_code if dst_code > NO_REPLY else 0

# Write out logs and results
if exit_code == 0:
json.dump(result, sys.stdout)

if log:
json.dump(log, sys.stderr)

sys.exit(exit_code)
# Extract packet loss stats
pkt_loss_match = re.search(r', ([0-9.]*)% packet loss', output, re.MULTILINE)

pkt_loss = float(pkt_loss_match.group(1)) if pkt_loss_match else -1.0

if __name__ == '__main__':
main()
# Return combined dict
return dict(rtt_stats, packet_loss_pct=pkt_loss)
8 changes: 8 additions & 0 deletions src/netrics/task/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from fate.task import ( # noqa: F401
log,
param,
)

from . import result # noqa: F401

from .sysexit import status # noqa: F401
Loading

0 comments on commit c71d0d3

Please sign in to comment.