diff --git a/pyproject.toml b/pyproject.toml index d834504..89924fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ packages = [{include = "netrics", from = "src"}] [tool.poetry.dependencies] python = "^3.8" -fate-scheduler = "0.1.0-rc.6" +fate-scheduler = "0.1.0-rc.7" netifaces = "^0.11.0" [tool.poetry.dev-dependencies] @@ -25,6 +25,8 @@ netrics = "netrics:main" # built-in measurement modules netrics-dev = "netrics.measurement.dev:main" netrics-dns-latency = "netrics.measurement.dns_latency:main" +netrics-lml = "netrics.measurement.lml:main" +netrics-lml-scamper = "netrics.measurement.lml:main" netrics-ndt7 = "netrics.measurement.ndt7:main" netrics-ookla = "netrics.measurement.ookla:main" netrics-ping = "netrics.measurement.ping:main" diff --git a/src/netrics/conf/include/measurements.yaml b/src/netrics/conf/include/measurements.yaml index f5e343a..5d47515 100644 --- a/src/netrics/conf/include/measurements.yaml +++ b/src/netrics/conf/include/measurements.yaml @@ -1,5 +1,5 @@ ping: - schedule: "*/5 * * * *" + schedule: "H/5 * * * *" param: destinations: # network locator: results label @@ -23,3 +23,6 @@ ping: 77.67.119.129: Paris 195.89.146.193: Stockholm 190.98.158.1: Sao_Paulo + +# lml-scamper: +# schedule: "H/5 * * * *" diff --git a/src/netrics/measurement/builtin/netrics-lml.py b/src/netrics/measurement/builtin/netrics-lml.py deleted file mode 100644 index 9eb99e1..0000000 --- a/src/netrics/measurement/builtin/netrics-lml.py +++ /dev/null @@ -1,86 +0,0 @@ -import subprocess as sp -import json -import sys -import ipaddress - -# Default input parameters -PARAM_DEFAULTS = {'target': '8.8.8.8'} - -def output_parser(out): - """ - Parses traceroute output and returns last mile info - """ - - res = {} - - res['src'] = out['src'] - res['dst'] = out['dst'] - res['attempts'] = out['attempts'] - - for i in range(out['probe_count']): - hop = out['hops'][i] - - # Check to see if we have ID'ed last mile hop IP addr - if 'last_mile_ip' in res: - if hop['addr'] != res['last_mile_ip']: - break - else: - res['rtts'].append(hop['rtt']) - - # Otherwise, see if this is last mile hop - elif not ipaddress.ip_address(hop['addr']).is_private: - res['last_mile_ip'] = hop['addr'] - res['rtts'] = [hop['rtt']] - - return res - -def error_parser(exit_code, err_msg): - """ - Handles exit code and returns correct error message - - """ - res = {} - - res['exit_code'] = exit_code - if exit_code == 0: - res['msg'] = "Traceroute successful" - if exit_code == 1: - res['msg'] = "Network error" - else: - res['msg'] = err_msg - - return res - -def main(): - - params = dict(PARAM_DEFAULTS, **json.load(sys.stdin)) - - cmd = f'scamper -O json -I "trace -P icmp-paris -q 3 -Q {params["target"]}"' - - # Run scamper traceroute - try: - lml_res = sp.run(cmd, capture_output=True, shell=True, check=True) - except sp.CalledProcessError as err: - stderr_res = {"exit_code": err.returncode, - "msg": err.stderr.decode('utf-8')} - json.dump(stderr_res, sys.stderr) - sys.exit(err.returncode) - - output = lml_res.stdout.decode('utf-8').split('\n')[1] - error = lml_res.stderr.decode('utf-8') - - # Run error parser - stderr_res = error_parser(lml_res.returncode, error) - - # Process test results - stdout_res = output_parser(json.loads(output)) - - # Communicate stdout, stderr, exit code - json.dump(stdout_res, sys.stdout) - json.dump(stderr_res, sys.stderr) - - sys.exit(0) - - -if __name__ == '__main__': - main() diff --git a/src/netrics/measurement/common/__init__.py b/src/netrics/measurement/common/__init__.py index d69b000..dfc6231 100644 --- a/src/netrics/measurement/common/__init__.py +++ b/src/netrics/measurement/common/__init__.py @@ -1 +1,7 @@ -from .connectivity import require_lan # noqa: F401 +from .connectivity import ( # noqa: F401 + default, + require_lan, + require_net, +) + +from .dns import AddressLookups # noqa: F401 diff --git a/src/netrics/measurement/common/connectivity/__init__.py b/src/netrics/measurement/common/connectivity/__init__.py index 9d62311..2094160 100644 --- a/src/netrics/measurement/common/connectivity/__init__.py +++ b/src/netrics/measurement/common/connectivity/__init__.py @@ -2,4 +2,6 @@ from .command import ping_dest_once, ping_dest_succeed_once # noqa: F401 -from .decorator import require_lan # noqa: F401 +from .decorator import require_lan, require_net # noqa: F401 + +from . import default # noqa: F401 diff --git a/src/netrics/measurement/common/connectivity/decorator.py b/src/netrics/measurement/common/connectivity/decorator.py index 1e1f14d..2ab64f9 100644 --- a/src/netrics/measurement/common/connectivity/decorator.py +++ b/src/netrics/measurement/common/connectivity/decorator.py @@ -1,13 +1,17 @@ """Measurement decorators to ensure network connectivity.""" +import concurrent.futures import functools import shutil import subprocess import netifaces +from fate.conf.schema import ConfSchema +from fate.util.datastructure import AttributeDict +from schema import Optional, SchemaError -from netrics import task +from netrics import conf, task -from . import command +from . import command, default class RequirementError(Exception): @@ -19,7 +23,7 @@ def __init__(self, returncode): class require_lan: """Decorator to extend a network measurement function with - preliminary network checks. + preliminary network accessibility checks. `require_lan` wraps the decorated function such that it will first ping the host (`localhost`), and then the default gateway, prior to @@ -48,6 +52,9 @@ def __init__(self, func): # (also assigns __wrapped__) functools.update_wrapper(self, func, updated=()) + def __repr__(self): + return repr(self.__wrapped__) + def __call__(self, *args, **kwargs): try: self.check_requirements() @@ -106,3 +113,117 @@ def check_requirements(self): msg="network gateway inaccessible", ) raise self.RequirementError(task.status.no_host) + + +class require_net(require_lan): + """Decorator to extend a network measurement function with + preliminary network and internet accessibility checks. + + `require_net` wraps the decorated function such that it will first + execute the checks implemented by `require_lan`; then, it will ping + internet hosts in parallel, prior to proceeding with the measurement + function's own functionality. + + Should any internet host respond to a single ping after a configured + number of attempts, measurement will proceed. + + Should no internet hosts respond, measurement will be aborted. + + For example: + + @require_net + def main(): + # Now we know at least that the LAN is operational *and* the + # Internet is accessible. + # + # For example, let's *now* attempt to access Google DNS: + # + result = subprocess.run( + ['ping', '-c', '1', '8.8.8.8'], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + return result.returncode + + Configuration of internet hosts to ping, and the number of attempts + to make, may be given in the "defaults" file under the extension key + `ext.require_net`, for example: + + ext: + require_net: + attempts: 3 + destinations: + - google.com + - facebook.com + - nytimes.com + + """ + schema = ConfSchema({ + Optional('destinations', default=default.PING_DESTINATIONS): + task.schema.DestinationList(), + + Optional('attempts', default=command.DEFAULT_ATTEMPTS): + task.schema.NaturalNumber('attempts'), + }) + + def check_requirements(self): + super().check_requirements() + + try: + conf_net = conf.default.ext.require_net + except AttributeError: + conf_net = True + else: + if conf_net is False: + return # disabled + + if conf_net is True: + conf_net = AttributeDict() + + try: + params = self.schema.validate(conf_net) + except SchemaError as exc: + task.log.critical(check=self.__class__.__name__, + error=str(exc), + msg="configuration error at 'ext.require_net'") + raise self.RequirementError(task.status.conf_error) + + # We want to return as soon as we receive ONE ping success; so, + # we needn't test *every* result. + # + # And, we take in easy on ourselves, and delegate to a thread pool. + # + # Note: we *could* monitor results synchronously (with sleeps), or use + # a *nix feature like os.wait() to good effect; however, + # concurrent.futures makes this *darn* easy, (and this is perhaps the + # direction that this library is going for subprocess management + # regardless). Plus, as attractive as *nix features may be, (and as + # little as Windows is currently under consideration), *this* is likely + # not the place to bind to platform-dependent features. + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = {executor.submit(command.ping_dest_succeed_once, + dest, + params.attempts): dest + for dest in params.destinations} + + for future in concurrent.futures.as_completed(futures): + success = future.result() + + if success: + task.log.log( + 'DEBUG' if success.attempts == 1 else 'WARNING', + dest=futures[future], + tries=success.attempts, + status='OK', + ) + return # success! + else: + task.log.critical( + dest=(params.destinations if len(params.destinations) < 4 + else params.destinations[:3] + ['...']), + tries=params.attempts, + status='Error', + msg="internet inaccessible", + ) + raise self.RequirementError(task.status.no_host) diff --git a/src/netrics/measurement/common/connectivity/default.py b/src/netrics/measurement/common/connectivity/default.py new file mode 100644 index 0000000..003d458 --- /dev/null +++ b/src/netrics/measurement/common/connectivity/default.py @@ -0,0 +1,3 @@ +PING_DESTINATIONS = ('google.com', + 'facebook.com', + 'nytimes.com') diff --git a/src/netrics/measurement/common/dns.py b/src/netrics/measurement/common/dns.py new file mode 100644 index 0000000..2746330 --- /dev/null +++ b/src/netrics/measurement/common/dns.py @@ -0,0 +1,67 @@ +"""Common DNS helpers.""" +import collections.abc +import ipaddress +import shutil +import subprocess + + +class AddressLookups(collections.abc.Mapping): + """Mapping enabling parallelized domain name resolution.""" + + _dig_path_ = shutil.which('dig') + + def __init__(self, destinations): + self._results_ = dict.fromkeys(destinations) + + self.queries = {} + + self._resolve_() + + self.resolved = frozenset(address for address in self._results_.values() + if address is not None) + + self.unresolved = tuple(host for (host, address) in self._results_.items() + if address is None) + + def _resolve_(self): + for host in self._results_: + try: + ipaddress.ip_address(host) + except ValueError: + if self._dig_path_ is None: + raise FileNotFoundError("dig executable not found") + + self.queries[host] = subprocess.Popen( + (self._dig_path_, '+short', host), + stdout=subprocess.PIPE, + text=True, + ) + else: + self._results_[host] = host + + for (host, process) in self.queries.items(): + (stdout, _stderr) = process.communicate() + + if process.returncode == 0: + try: + self._results_[host] = stdout.splitlines()[0] + except IndexError: + pass + + def __getitem__(self, item): + return self._results_[item] + + def __len__(self): + return len(self._results_) + + def __iter__(self): + yield from self._results_ + + def __repr__(self): + map_ = ', '.join(f'{key} => {value}' + for (key, value) in self._results_.items()) + + return f'<{self.__class__.__name__}: [{map_}]>' + + def getkeys(self, value): + return {host for (host, address) in self._results_.items() if address == value} diff --git a/src/netrics/measurement/lml.py b/src/netrics/measurement/lml.py new file mode 100644 index 0000000..05b53fd --- /dev/null +++ b/src/netrics/measurement/lml.py @@ -0,0 +1,232 @@ +"""Measure latency to the "last mile" host via scamper.""" +import json +import random +import shutil +import statistics +import subprocess +from ipaddress import ip_address +from itertools import groupby + +from schema import Optional + +from netrics import task + +from .common import AddressLookups, require_net + + +# +# params schema +# +PARAMS = task.schema.extend('last_mile_rtt', { + # destinations: (scamper): list of hosts (IP address preferred!) + # OR mapping of hosts to their labels (for results) + # (Note: will select ONE) + Optional('destinations', + default={'8.8.8.8': 'Google_DNS', + '1.1.1.1': 'Cloudflare_DNS'}): task.schema.DestinationCollection(), + + # attempts: (scamper): natural number + Optional('attempts', default='3'): task.schema.NaturalStr('attempts'), + + # timeout: (scamper): positive integer seconds + Optional('timeout', default='5'): task.schema.PositiveIntStr('timeout', 'seconds'), + + # include: mapping + Optional('include', + default={'last_mile_ip': False, + 'source_ip': False}): { + # last_mile_ip: include detected IP address of last mile host in results + Optional('last_mile_ip', default=False): bool, + + # source_ip: include device LAN IP address in results + Optional('source_ip', default=False): bool, + }, +}) + + +@task.param.require(PARAMS) +@require_net +def main(params): + """Measure latency to the "last mile" host via scamper. + + The local network, and then internet hosts (as configured in global + defaults), are queried first, to ensure network operation and + internet accessibility. (See: `require_net`.) + + A scamper trace is then executed against a configured internet host. + (A domain name *may* be configured in lieu of an IP address; the + name will first be resolved to its IP.) + + Multiple host targets *may* be specified, in which case the target + to trace is randomly selected; and, in the case of failure, + additional targets will be selected, sequentially. + + The "last mile" host -- *i.e.* the first "hop" outside of the + client's private network -- is identified from scamper trace + results, and this host's "round-trip time" (RTT) to respond is + parsed and written out according to configuration. + + """ + # ensure scamper on PATH + scamper_path = shutil.which('scamper') + if scamper_path is None: + task.log.critical("scamper executable not found") + return task.status.file_missing + + # resolve destination(s) given by domain to IP + address_lookups = AddressLookups(params.destinations) + + for hostname in address_lookups.unresolved: + task.log.error(host=hostname, + status=address_lookups.queries[hostname].returncode, + msg='domain look-up failure') + + if not address_lookups.resolved: + task.log.critical(errors=len(address_lookups.unresolved), + msg='no addresses to query') + return task.status.no_host + + # randomize target from resolved destination(s) + target_ips = list(address_lookups.resolved) + random.shuffle(target_ips) + + # try target(s) falling back sequentially + trace_cmd = f'trace -Q -P icmp-paris -q {params.attempts} -w {params.timeout}' + + for target_ip in target_ips: + try: + process = subprocess.run( + ( + scamper_path, + '-O', 'json', + '-c', trace_cmd, + '-i', target_ip, + ), + capture_output=True, + check=True, + text=True, + ) + except subprocess.CalledProcessError as exc: + # scamper shouldn't really error this way: this is serious + task.log.critical( + dest=target_ip, + status=f'Error ({exc.returncode})', + args=exc.cmd, + stdout=exc.stdout, + stderr=exc.stderr, + ) + return task.status.software_error + + # + # retrieve any returned results + # + # scamper returns results for each target given; we've given + # one target and expect to receive one results set. + # + # however, scamper *could* fail to return proper and/or desired + # results: here, evaluating either to *no* results set (length + # zero) OR to a results set equal to None. we'll treat either + # case the same. + # + try: + (results,) = parse_output(process.stdout) + except ValueError: + results = None + + if results is not None: + # we got what we wanted! we're done. + break + + # + # query against this target failed... + # + # log and continue to next target (if any) + # + task.log.error( + dest=target_ip, + stdout=process.stdout, + stderr=process.stderr, + msg='no result identified', + ) + else: + # no queries succeeded! + task.log.critical( + dests=target_ips, + status='Error', + msg='all queries failed', + ) + return task.status.no_host + + # write results + if not params.include.last_mile_ip: + del results['last_mile_tr_addr'] + + if not params.include.source_ip: + del results['last_mile_tr_src'] + + target_ip = results.pop('last_mile_tr_dst') # preserved from loop but for clarity + + (target_host, *extra_names) = address_lookups.getkeys(target_ip) + + if extra_names: + task.log.warning(dest=target_ip, + msg='destination given by multiple hostnames') + + if isinstance(params.destinations, dict): + target_label = params.destinations[target_host] + else: + target_label = target_host + + if params.result.flat: + del results['last_mile_tr_rtt_ms'] + + results = {f'{target_label}_{key}': value + for (key, value) in results.items()} + else: + results = {target_label: results} + + task.result.write(results, + label=params.result.label, + annotate=params.result.annotate) + + return task.status.success + + +def parse_output(output): + """Parse scamper output to return last mile info.""" + records = (json.loads(line) for line in output.splitlines()) + + return [ + prepare_result(record) + for record in records + if record['type'] == 'trace' + ] + + +def prepare_result(record): + """Construct last mile result from scamper trace record.""" + if record['stop_reason'] != 'COMPLETED': + task.log.warning(dest=record['dst'], + count=record['probe_count'], + stop_reason=record['stop_reason']) + + hop_groups = groupby(record['hops'], lambda hop: hop['addr']) + + for (addr, trials) in hop_groups: + if not ip_address(addr).is_private: + # the first non-private "hop" is the "last mile" + rtts = [trial['rtt'] for trial in trials] + + return { + 'last_mile_tr_dst': record['dst'], + 'last_mile_tr_src': record['src'], + 'last_mile_tr_addr': addr, + 'last_mile_tr_rtt_ms': rtts, + 'last_mile_tr_rtt_max_ms': max(rtts), + 'last_mile_tr_rtt_min_ms': min(rtts), + 'last_mile_tr_rtt_median_ms': statistics.median(rtts), + 'last_mile_tr_rtt_mdev_ms': round(statistics.stdev(rtts), 3), + } + + # no last-mile/WAN data found + return None diff --git a/src/netrics/measurement/ping.py b/src/netrics/measurement/ping.py index e377e2f..9bffca8 100644 --- a/src/netrics/measurement/ping.py +++ b/src/netrics/measurement/ping.py @@ -2,19 +2,12 @@ import re import subprocess from collections import defaultdict -from numbers import Real -from schema import ( - And, - Or, - Optional, - Use, - SchemaError, -) +from schema import Optional from netrics import task -from .common import require_lan +from .common import default, require_lan # @@ -32,65 +25,37 @@ # -# params +# params schema # # input -- a (deserialized) mapping -- is entirely optional. # # a dict, of the optional param keys, their defaults, and validations of -# their values, is given below. +# their values, is given below, (extending the globally-supported input +# parameter schema given by `task.schema`). # - -Text = And(str, len) # non-empty str - -PARAM_SCHEMA = { +PARAMS = task.schema.extend('ping_latency', { # destinations: (ping): list of hosts # OR mapping of hosts to their labels (for results) Optional('destinations', - default=('google.com', - 'facebook.com', - 'nytimes.com')): Or({Text: Text}, - And([Text], - lambda dests: len(dests) == len(set(dests))), - error="destinations: must be non-repeating list " - "of network locators or mapping of these " - "to their result labels"), + default=default.PING_DESTINATIONS): task.schema.DestinationCollection(), # count: (ping): natural number - Optional('count', default='10'): And(int, - lambda count: count > 0, - Use(str), - error="count: int must be greater than 0"), + Optional('count', default='10'): task.schema.NaturalStr('count'), # interval: (ping): int/decimal seconds no less than 2ms - Optional('interval', default='0.25'): And(Real, - lambda interval: interval >= 0.002, - Use(str), - error="interval: seconds must not be less than 2ms"), + Optional('interval', + default='0.25'): task.schema.BoundedRealStr('interval', + 'seconds may be no less than 0.002 (2ms)', + lambda interval: interval >= 0.002), # deadline: (ping): positive integer seconds - Optional('deadline', default='5'): And(int, - lambda deadline: deadline >= 0, - Use(str), - error="deadline: int seconds must not be less than 0"), - - # result: mappping - Optional('result', default={'flat': True, - 'label': 'ping_latency', - 'meta': True}): { - # flat: flatten ping destination results dict to one level - Optional('flat', default=True): bool, - - # wrap: wrap the above (whether flat or not) in a measurement label - Optional('label', default='ping_latency'): Or(False, None, Text), - - # meta: wrap all of the above (whatever it is) with metadata (time, etc.) - Optional('meta', default=True): bool, - }, -} + Optional('deadline', default='5'): task.schema.PositiveIntStr('deadline', 'seconds'), +}) +@task.param.require(PARAMS) @require_lan -def main(): +def main(params): """Measure ping latency to configured hosts. The local network is queried first to ensure operation. @@ -104,13 +69,6 @@ def main(): according to configuration (`result`). """ - # read input params - try: - params = task.param.read(schema=PARAM_SCHEMA) - except SchemaError as exc: - task.log.critical(error=str(exc), msg="input error") - return task.status.conf_error - # parallelize pings processes = { destination: subprocess.Popen( @@ -192,7 +150,7 @@ def main(): # write results task.result.write(results, label=params.result.label, - meta=params.result.meta) + annotate=params.result.annotate) return task.status.success diff --git a/src/netrics/task/__init__.py b/src/netrics/task/__init__.py index a0bc0df..35f08d5 100644 --- a/src/netrics/task/__init__.py +++ b/src/netrics/task/__init__.py @@ -1,8 +1,9 @@ -from fate.task import ( # noqa: F401 - log, +from fate.task import log # noqa: F401 + +from . import ( # noqa: F401 param, + result, + schema, ) -from . import result # noqa: F401 - from .sysexit import status # noqa: F401 diff --git a/src/netrics/task/param.py b/src/netrics/task/param.py new file mode 100644 index 0000000..6994bb2 --- /dev/null +++ b/src/netrics/task/param.py @@ -0,0 +1,59 @@ +import functools + +from schema import SchemaError + +from fate.task.param import read + +import netrics.task + + +class ParamTask: + """Callable wrapper to first read and validate task input parameters + as specified by `schema`. + + This wrapper is deployed by the decorator `require`. + + See: `require`. + + """ + def __init__(self, schema, func): + self.schema = schema + + # assign func's __module__, __name__, etc. + # (but DON'T update __dict__) + # + # (also assigns __wrapped__) + functools.update_wrapper(self, func, updated=()) + + def __repr__(self): + return repr(self.__wrapped__) + + def __call__(self, *args, **kwargs): + # read input params + try: + params = read(schema=self.schema) + except SchemaError as exc: + netrics.task.log.critical(error=str(exc), msg="input error") + return netrics.task.status.conf_error + + return self.__wrapped__(params, *args, **kwargs) + + +class require: + """Wrap the decorated callable to first read and schema-validate + task input parameters. + + Having validated input, the wrapped callable is invoked with the + cleaned parameters as its first argument. + + Upon validation error, the wrapped callable is *not* invoked. The + error is logged and the appropriate status code returned. + + See: `ParamTask`. + + """ + def __init__(self, schema): + self.schema = schema + + def __call__(self, func): + return ParamTask(self.schema, func) diff --git a/src/netrics/task/result.py b/src/netrics/task/result.py index a112286..764da6b 100644 --- a/src/netrics/task/result.py +++ b/src/netrics/task/result.py @@ -4,10 +4,10 @@ import fate.task.result -def write(results, /, label=None, meta=True, **kwargs): +def write(results, /, label=None, annotate=True, **kwargs): """Write task results. - Wraps results in metadata, by default, according to `meta=True`; + Wraps results in metadata, by default, according to `annotate=True`; and, places results under the key `label`, if provided. See `fate.task.result.write` for further details. @@ -16,7 +16,7 @@ def write(results, /, label=None, meta=True, **kwargs): if label: results = {label: results} - if meta: + if annotate: results = { 'Measurements': results, 'Meta': { diff --git a/src/netrics/task/schema.py b/src/netrics/task/schema.py new file mode 100644 index 0000000..8141a16 --- /dev/null +++ b/src/netrics/task/schema.py @@ -0,0 +1,201 @@ +"""Support for construction of schema to validate and to provide +defaults to task input parameters. + +""" +import collections.abc +from numbers import Real + +from schema import ( + And, + Optional, + Or, + Schema, + Use, +) + +import netrics +from netrics.util import lazy + + +# +# Schema composite "primitives" +# + +Text = And(str, len) # non-empty str + + +def DestinationList(name='destinations'): + return And( + [Text], + lambda dests: len(dests) == len(set(dests)), + error=f"{name}: must be a non-repeating list of network locators", + ) + + +def DestinationCollection(name='destinations'): + return Or( + {Text: Text}, + DestinationList(name), + error=f"{name}: must be non-repeating list " + "of network locators or mapping of these " + "to their result labels", + ) + + +def NaturalNumber(name): + return And(int, + lambda value: value > 0, + error=f"{name}: int must be greater than 0") + + +def NaturalStr(name): + return And(NaturalNumber(name), + Use(str)) + + +def PositiveInt(name, unit): + return And(int, + lambda value: value >= 0, + error=f"{name}: int {unit} must not be less than 0") + + +def PositiveIntStr(name, unit): + return And(PositiveInt(name, unit), + Use(str)) + + +def BoundedReal(name, message, boundary): + return And(Real, + boundary, + error=f"{name}: {message}") + + +def BoundedRealStr(*args, **kwargs): + return And(BoundedReal(*args, **kwargs), + Use(str)) + + +# +# Schema of result meta params' global defaults configuration +# +# Values provided by defaults configuration file under key `ext.result`. +# +RESULT_META_DEFAULTS = Schema({ + # + # flat: flatten results dict to one level + # + Optional('flat', default=True): bool, + + # + # label: wrap the above (whether flat or not) in a measurement label + # + # (actual *text* of label provided by measurement and overridden by measurement params) + # + Optional('label', default=True): bool, + + # + # annotate: wrap all of the above (whatever it is) with metadata (time, etc.) + # + Optional('annotate', default=True): bool, +}) + + +def get_default(label): + """Construct schema for globally-supported task parameters. + + Default values are populated from those given by any mapping at key + `ext.result` in the defaults configuration. Note that these values + are themselves validated, but lazily, such that validation errors + are raised only during use of the schema returned by this function. + (See `RESULT_META_DEFAULTS`.) + + As these global parameters concern the handling of task results, the + text `label` of the measurement is required. This label is applied + to the results, unless overridden or disabled by task-level + parameter configuration, or disabled by global default. (See + `ext.result.label`.) + + """ + # our global defaults specified via mapping at `ext` + conf_ext = netrics.conf.default.get('ext') + + # configuration may be empty or null ... that's ok + try: + default_values = conf_ext['result'] + except (KeyError, TypeError): + default_conf = {} + else: + # + # Note: This is an issue with schema-validating configuration + # mappings, which are currently *not* simple instances of dict + # (and not "registered" as such). + # + # This may now be handled using `fate.conf.schema.ConfSchema` in + # lieu of `schema.Schema`! + # + # However, so long as this configuration is so simple as it is + # -- a single-level of booleans -- and passed off to another + # layer which treats it as a simple dict, there's little to gain + # from switching this over. + # + # *Should* this be desired, the below conditional cast may be + # nixed, in lieu of `ConfSchema` above. + # + default_conf = (dict(default_values) + if isinstance(default_values, collections.abc.Mapping) + else default_values) + + # + # we *do* want to validate -- and provide software defaults for -- + # any configuration that's specified + # + # however, we *do not* want to raise validation errors before + # they're expected (e.g. not at the module-level) + # + # luckily, Schema supports *lazy* defaults, retrieved via callable. + # + # the below allows us to retrieve callable *promises* for values in + # the validated defaults configuration. + # + # during validation of the measurement's *actual parameterized input*, + # Schema will invoke these promised defaults, and thereby trigger + # their own schema validations. + # + defaults = lazy.LazyValidator(RESULT_META_DEFAULTS, default_conf) + + # the below are all *callable promises* for our defaults + default_flat = defaults['flat'] + default_annotate = defaults['annotate'] + default_label = lambda: label if defaults['label']() else None # noqa: E731 + + default_result = lambda: {'annotate': default_annotate(), # noqa: E731 + 'label': default_label(), + 'flat': default_flat()} + + return { + # result: mappping + Optional('result', default=default_result): { + # flat: flatten results dict to one level + Optional('flat', default=default_flat): bool, + + # label: wrap the above (whether flat or not) in a measurement label + Optional('label', default=default_label): Or(False, None, Text), + + # annotate: wrap all of the above (whatever it is) with metadata (time, etc.) + Optional('annotate', default=default_annotate): bool, + }, + } + + +def extend(label, schema): + """Construct a task parameter schema extending the globally- + supported task parameter schema. + + The resulting `dict` will contain both schema for validating + globally-supported task parameters *and* task-specific parameters + specified by `schema`. + + See: `get_default`. + + """ + return {**get_default(label), **schema} diff --git a/src/netrics/util/__init__.py b/src/netrics/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/netrics/util/lazy.py b/src/netrics/util/lazy.py new file mode 100644 index 0000000..22234e2 --- /dev/null +++ b/src/netrics/util/lazy.py @@ -0,0 +1,74 @@ +import abc +import collections.abc +import typing +from dataclasses import dataclass + +import schema +from descriptors import cachedproperty + + +class LazilyValidated(abc.ABC): + """Abstract base class of lazy-validation data wrappers. + + Concrete subclasses must implement internal method `_get_value_` to + trigger validation and realization of wrapped data. + + Validated data are accessible via public (caching) property `value` + *and* via invocation of the instance. + + Lazily-evaluated promises for items contained within the data may be + retrieved (recursively) via subscription. + + See `LazyValidator` and `LazyItem`. + + """ + @abc.abstractmethod + def _get_value_(self): + return None + + @cachedproperty + def value(self): + return self._get_value_() + + def __call__(self): + return self.value + + def __getitem__(self, key): + return LazyItem(self, key) + + +@dataclass +class LazyValidator(LazilyValidated): + """Entrypoint for lazy validation of `data` by the given `schema`. + + See `LazilyValidated`. + + """ + schema: schema.Schema + data: collections.abc.Collection + + def _get_value_(self): + return self.schema.validate(self.data) + + def __repr__(self): + value = repr(self.value) if 'value' in self.__dict__ else 'unknown' + return f'<{self.__class__.__name__}: {value}>' + + +@dataclass +class LazyItem(LazilyValidated): + """Lazily-evaluated promise for an item contained within lazily- + validated data. + + See `LazilyValidated` and `LazyValidator`. + + """ + src: LazilyValidated + key: typing.Any + + def _get_value_(self): + return self.src()[self.key] + + def __repr__(self): + value = repr(self.value) if 'value' in self.__dict__ else 'unknown' + return f'<{self.__class__.__name__}[{self.key!r}]: {value}>' diff --git a/test/param/lml.json b/test/param/lml.json deleted file mode 100644 index 681c2e8..0000000 --- a/test/param/lml.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "target": "8.8.8.8" -} diff --git a/test/param/ping.json b/test/param/ping.json deleted file mode 100644 index d05928d..0000000 --- a/test/param/ping.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "targets": [ - "google.com", - "netflix.com", - "nytimes.com", - "facebook.com", - "reddit.com", - "github.com", - "linkedin.com" - ] -} diff --git a/test/param/traceroute.json b/test/param/traceroute.json deleted file mode 100644 index 36c09b1..0000000 --- a/test/param/traceroute.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "targets": [ - "1.1.1.1", - "nytimes.com" - ] -}