Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

last-mile latency: scamper version #32

Merged
merged 4 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ packages = [{include = "netrics", from = "src"}]

[tool.poetry.dependencies]
python = "^3.8"
fate-scheduler = "0.1.0-rc.6"
fate-scheduler = "0.1.0-rc.7"
netifaces = "^0.11.0"

[tool.poetry.dev-dependencies]
Expand All @@ -25,6 +25,8 @@ netrics = "netrics:main"
# built-in measurement modules
netrics-dev = "netrics.measurement.dev:main"
netrics-dns-latency = "netrics.measurement.dns_latency:main"
netrics-lml = "netrics.measurement.lml:main"
netrics-lml-scamper = "netrics.measurement.lml:main"
netrics-ndt7 = "netrics.measurement.ndt7:main"
netrics-ookla = "netrics.measurement.ookla:main"
netrics-ping = "netrics.measurement.ping:main"
Expand Down
5 changes: 4 additions & 1 deletion src/netrics/conf/include/measurements.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ping:
schedule: "*/5 * * * *"
schedule: "H/5 * * * *"
param:
destinations:
# network locator: results label
Expand All @@ -23,3 +23,6 @@ ping:
77.67.119.129: Paris
195.89.146.193: Stockholm
190.98.158.1: Sao_Paulo

# lml-scamper:
# schedule: "H/5 * * * *"
86 changes: 0 additions & 86 deletions src/netrics/measurement/builtin/netrics-lml.py

This file was deleted.

8 changes: 7 additions & 1 deletion src/netrics/measurement/common/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
from .connectivity import require_lan # noqa: F401
from .connectivity import ( # noqa: F401
default,
require_lan,
require_net,
)

from .dns import AddressLookups # noqa: F401
4 changes: 3 additions & 1 deletion src/netrics/measurement/common/connectivity/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@

from .command import ping_dest_once, ping_dest_succeed_once # noqa: F401

from .decorator import require_lan # noqa: F401
from .decorator import require_lan, require_net # noqa: F401

from . import default # noqa: F401
127 changes: 124 additions & 3 deletions src/netrics/measurement/common/connectivity/decorator.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
"""Measurement decorators to ensure network connectivity."""
import concurrent.futures
import functools
import shutil
import subprocess

import netifaces
from fate.conf.schema import ConfSchema
from fate.util.datastructure import AttributeDict
from schema import Optional, SchemaError

from netrics import task
from netrics import conf, task

from . import command
from . import command, default


class RequirementError(Exception):
Expand All @@ -19,7 +23,7 @@ def __init__(self, returncode):

class require_lan:
"""Decorator to extend a network measurement function with
preliminary network checks.
preliminary network accessibility checks.

`require_lan` wraps the decorated function such that it will first
ping the host (`localhost`), and then the default gateway, prior to
Expand Down Expand Up @@ -48,6 +52,9 @@ def __init__(self, func):
# (also assigns __wrapped__)
functools.update_wrapper(self, func, updated=())

def __repr__(self):
return repr(self.__wrapped__)

def __call__(self, *args, **kwargs):
try:
self.check_requirements()
Expand Down Expand Up @@ -106,3 +113,117 @@ def check_requirements(self):
msg="network gateway inaccessible",
)
raise self.RequirementError(task.status.no_host)


class require_net(require_lan):
"""Decorator to extend a network measurement function with
preliminary network and internet accessibility checks.

`require_net` wraps the decorated function such that it will first
execute the checks implemented by `require_lan`; then, it will ping
internet hosts in parallel, prior to proceeding with the measurement
function's own functionality.

Should any internet host respond to a single ping after a configured
number of attempts, measurement will proceed.

Should no internet hosts respond, measurement will be aborted.

For example:

@require_net
def main():
# Now we know at least that the LAN is operational *and* the
# Internet is accessible.
#
# For example, let's *now* attempt to access Google DNS:
#
result = subprocess.run(
['ping', '-c', '1', '8.8.8.8'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return result.returncode

Configuration of internet hosts to ping, and the number of attempts
to make, may be given in the "defaults" file under the extension key
`ext.require_net`, for example:

ext:
require_net:
attempts: 3
destinations:
- google.com
- facebook.com
- nytimes.com

"""
schema = ConfSchema({
Optional('destinations', default=default.PING_DESTINATIONS):
task.schema.DestinationList(),

Optional('attempts', default=command.DEFAULT_ATTEMPTS):
task.schema.NaturalNumber('attempts'),
})

def check_requirements(self):
super().check_requirements()

try:
conf_net = conf.default.ext.require_net
except AttributeError:
conf_net = True
else:
if conf_net is False:
return # disabled

if conf_net is True:
conf_net = AttributeDict()

try:
params = self.schema.validate(conf_net)
except SchemaError as exc:
task.log.critical(check=self.__class__.__name__,
error=str(exc),
msg="configuration error at 'ext.require_net'")
raise self.RequirementError(task.status.conf_error)

# We want to return as soon as we receive ONE ping success; so,
# we needn't test *every* result.
#
# And, we take in easy on ourselves, and delegate to a thread pool.
#
# Note: we *could* monitor results synchronously (with sleeps), or use
# a *nix feature like os.wait() to good effect; however,
# concurrent.futures makes this *darn* easy, (and this is perhaps the
# direction that this library is going for subprocess management
# regardless). Plus, as attractive as *nix features may be, (and as
# little as Windows is currently under consideration), *this* is likely
# not the place to bind to platform-dependent features.

with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(command.ping_dest_succeed_once,
dest,
params.attempts): dest
for dest in params.destinations}

for future in concurrent.futures.as_completed(futures):
success = future.result()

if success:
task.log.log(
'DEBUG' if success.attempts == 1 else 'WARNING',
dest=futures[future],
tries=success.attempts,
status='OK',
)
return # success!
else:
task.log.critical(
dest=(params.destinations if len(params.destinations) < 4
else params.destinations[:3] + ['...']),
tries=params.attempts,
status='Error',
msg="internet inaccessible",
)
raise self.RequirementError(task.status.no_host)
3 changes: 3 additions & 0 deletions src/netrics/measurement/common/connectivity/default.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
PING_DESTINATIONS = ('google.com',
'facebook.com',
'nytimes.com')
67 changes: 67 additions & 0 deletions src/netrics/measurement/common/dns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Common DNS helpers."""
import collections.abc
import ipaddress
import shutil
import subprocess


class AddressLookups(collections.abc.Mapping):
"""Mapping enabling parallelized domain name resolution."""

_dig_path_ = shutil.which('dig')

def __init__(self, destinations):
self._results_ = dict.fromkeys(destinations)

self.queries = {}

self._resolve_()

self.resolved = frozenset(address for address in self._results_.values()
if address is not None)

self.unresolved = tuple(host for (host, address) in self._results_.items()
if address is None)

def _resolve_(self):
for host in self._results_:
try:
ipaddress.ip_address(host)
except ValueError:
if self._dig_path_ is None:
raise FileNotFoundError("dig executable not found")

self.queries[host] = subprocess.Popen(
(self._dig_path_, '+short', host),
stdout=subprocess.PIPE,
text=True,
)
else:
self._results_[host] = host

for (host, process) in self.queries.items():
(stdout, _stderr) = process.communicate()

if process.returncode == 0:
try:
self._results_[host] = stdout.splitlines()[0]
except IndexError:
pass

def __getitem__(self, item):
return self._results_[item]

def __len__(self):
return len(self._results_)

def __iter__(self):
yield from self._results_

def __repr__(self):
map_ = ', '.join(f'{key} => {value}'
for (key, value) in self._results_.items())

return f'<{self.__class__.__name__}: [{map_}]>'

def getkeys(self, value):
return {host for (host, address) in self._results_.items() if address == value}
Loading