diff --git a/src/netrics/conf/include/measurements.yaml b/src/netrics/conf/include/measurements.yaml index 7286b49..c7a9145 100644 --- a/src/netrics/conf/include/measurements.yaml +++ b/src/netrics/conf/include/measurements.yaml @@ -1,3 +1,16 @@ +dns-latency: + schedule: "H/5 * * * *" + param: + destinations: + - www.amazon.com + - chicago.suntimes.com + - www.chicagotribune.com + - cs.uchicago.edu + - www.facebook.com + - www.google.com + - www.wikipedia.org + - www.youtube.com + hops: command: hops-traceroute schedule: "H/5 * * * *" diff --git a/src/netrics/measurement/dns_latency.py b/src/netrics/measurement/dns_latency.py index 5ddaf3b..e787acb 100644 --- a/src/netrics/measurement/dns_latency.py +++ b/src/netrics/measurement/dns_latency.py @@ -1,16 +1,187 @@ -import subprocess as sp +"""Measure query latency statistics to resolve a set of domain names.""" +import operator +import statistics +import subprocess +from types import SimpleNamespace as ns -# Default input parameters -PARAM_DEFAULTS = {'targets': ['google.com', 'facebook.com', 'nytimes.com']} +import yaml +from schema import Optional -def main(): +from netrics import task +from netrics.util import ( + iterutils, + procutils, +) - params = dict(PARAM_DEFAULTS, **json.load(sys.stdin)) +from .common import ( + default, + require_exec, + require_net, +) - for dst in params['target']: - stdout_res[dst] = {} - stderr_res[dst] = {} - cmd = ['dig', -if __name__ == '__main__': - main() +# +# dig exit codes +# +DIG_CODES = { + 0: "success", + 1: "usage error", + 8: "couldn't open batch file", + 9: "no reply from server", + 10: "internal error", +} + + +# +# params schema +# +PARAMS = task.schema.extend('dns_latency', { + # destinations: (dig): list of domain names + Optional('destinations', + default=default.DESTINATIONS): task.schema.HostnameList(), + + # server: (dig): DNS server to query + Optional('server', default='8.8.8.8'): task.schema.IPAddress('server'), +}) + + +@task.param.require(PARAMS) +@require_exec('dig') +@require_net +def main(dig, params): + """Measure query latency statistics to resolve a set of domain names. + + The local network, and then internet hosts (as configured in global + defaults), are queried first, to ensure network operation and + internet accessibility. (See: `require_net`.) + + The `dig` command is then executed, concurrently, for each + configured domain name (`destinations`), against the configured DNS + server (`server`). + + The mean and maximum values of the query time, reported by `dig`, + over these invocations, is written out according to configuration + (`result`). + + """ + # parallelize look-ups + pool = [ + subprocess.Popen( + ( + dig, + f'@{params.server}', + destination, + '+yaml', + ), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + for destination in params.destinations + ] + + # wait and map to completed processes + processes = [procutils.complete(process) for process in pool] + + # wrap completed processes with annotative encapsulation + trials = [ + ns( + dest=destination, + proc=process, + ) + for (destination, process) in zip(params.destinations, processes) + ] + + # check for exceptions + (failures, successes) = iterutils.sequence(operator.attrgetter('proc.returncode'), trials) + + fail_total = len(failures) + + for (fail_count, failure) in enumerate(failures, 1): + task.log.error( + dest=failure.dest, + status=f'Error ({failure.proc.returncode})', + error=DIG_CODES.get(failure.proc.returncode, ""), + failure=f"({fail_count}/{fail_total})", + stdout=failure.proc.stdout, + stderr=failure.proc.stderr, + ) + + if not successes: + task.log.critical("no destinations succeeded") + return task.status.no_host + + # prepare results + try: + times_label = {success.dest: extract_time_ms(success.proc.stdout) for success in successes} + except ExtractionError as exc: + task.log.critical( + error=exc.msg, + stdout=exc.stdout, + msg='latency extraction error', + ) + return task.status.software_error + + times = times_label.values() + + results = {'avg_ms': statistics.mean(times), + 'max_ms': max(times)} + + # add'l detail + times_sort = sorted(times_label.items(), key=operator.itemgetter(1)) + + task.log.info( + min_label=dict(times_sort[:1]), + mean=round(statistics.mean(times), 1), + stdev=round(statistics.stdev(times), 1), + max_label=dict(times_sort[-1:]), + ) + + # flatten results + if params.result.flat: + results = {f'dns_query_{feature}': value for (feature, value) in results.items()} + else: + results = {'dns_query': results} + + # write results + task.result.write(results, + label=params.result.label, + annotate=params.result.annotate) + + return task.status.success + + +class ExtractionError(ValueError): + """Unexpected dig output""" + + def __init__(self, msg, stdout): + super().__init__(msg, stdout) + + def __str__(self): + return self.msg + + @property + def msg(self): + return self.args[0] + + @property + def stdout(self): + return self.args[1] + + +def extract_time_ms(stdout_yaml): + """Extract query latency in miliseconds from dig's YAML output.""" + try: + (data,) = yaml.safe_load(stdout_yaml) + except ValueError: + raise ExtractionError('unexpected output', stdout_yaml) + + try: + message = data['message'] + delta = message['response_time'] - message['query_time'] + except (KeyError, TypeError): + raise ExtractionError('unexpected structure', stdout_yaml) + + seconds = delta.total_seconds() + + return seconds * 1e3 diff --git a/src/netrics/task/schema.py b/src/netrics/task/schema.py index 2db46fb..cca88e0 100644 --- a/src/netrics/task/schema.py +++ b/src/netrics/task/schema.py @@ -3,6 +3,7 @@ """ import collections.abc +import ipaddress from numbers import Real from schema import ( @@ -28,20 +29,41 @@ def DestinationList(name='destinations'): return And( [Text], lambda dests: len(dests) == len(set(dests)), - error=f"{name}: must be a non-repeating list of network locators", + error=(name and f"{name}: must be a non-repeating list of network locators"), ) def DestinationCollection(name='destinations'): return Or( {Text: Text}, - DestinationList(name), + DestinationList(None), error=f"{name}: must be non-repeating list " "of network locators or mapping of these " "to their result labels", ) +def valid_ip(value): + try: + ipaddress.ip_address(value) + except ValueError: + return False + else: + return True + + +def IPAddress(name): + return Schema(valid_ip, error=f'{name}: must be an IPv4 or IPv6 address: {{}}') + + +def HostnameList(name='destinations'): + return And( + DestinationList(None), + [lambda value: not valid_ip(value)], + error=f'{name}: must be a non-repeating list of hostnames', + ) + + def NaturalNumber(name): return And(int, lambda value: value > 0,