diff --git a/pyproject.toml b/pyproject.toml index 3a6002c..059be65 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ packages = [{include = "netrics", from = "src"}] [tool.poetry.dependencies] python = "^3.8" -fate-scheduler = "0.1.0-rc.8" +fate-scheduler = "0.1.0-rc.9" netifaces = "^0.11.0" [tool.poetry.dev-dependencies] diff --git a/src/netrics/conf/include/measurements.yaml b/src/netrics/conf/include/measurements.yaml index c7a9145..e96f30d 100644 --- a/src/netrics/conf/include/measurements.yaml +++ b/src/netrics/conf/include/measurements.yaml @@ -1,3 +1,6 @@ +dev: + schedule: "H/5 * * * *" + dns-latency: schedule: "H/5 * * * *" param: diff --git a/src/netrics/measurement/dev.py b/src/netrics/measurement/dev.py index 166c070..5c9a393 100644 --- a/src/netrics/measurement/dev.py +++ b/src/netrics/measurement/dev.py @@ -1,79 +1,235 @@ -import subprocess as sp -import json -import sys +"""Count devices connected to the local network.""" +import ipaddress +import subprocess import time +import typing +from datetime import datetime, timedelta -PARAM_DEFAULTS = {"iface": "eth0"} +import netifaces +from descriptors import classonlymethod +from schema import Optional + +from netrics import task + +from .common import require_exec, require_lan + + +PARAMS = task.schema.extend('connected_devices_arp', { + Optional('iface', default='eth0'): task.schema.Text, +}) -def parse_arp_output(out): - """ - Parses arp output and returns results - """ - res = {} - ts = int(time.time()) - devices = set(out.decode('utf-8').strip().split('\n')) +@task.param.require(PARAMS) +@require_exec('nmap', 'arp') +@require_lan +def main(nmap, arp, params): + """Count devices connected to the local network. - # Parsing - res['n_devs'] = len(devices) - res['devs'] = [] - for dev in devices: - res['devs'].append({'name': dev, 'ts': ts}) + The local network is queried first to ensure operation. + (See: `require_lan`.) - return res + nmap and arp are then executed to detect devices connected to the + local network. The network interface to query may be configured + (`iface`). -def main(): + Devices are recorded by MAC address, their most recent timestamp of + detection persisted to task state. - params = dict(PARAM_DEFAULTS, **json.load(sys.stdin)) + Written results consist of the aggregate count of connected devices: - stdout_res = {} + * currently + * for all time + * over 24 hours + * over 7 days - # Get local subnet - route_cmd = f"ip r | grep -v default | grep src | grep {params['iface']} | head -n 1 | awk '{{print $1;}}'" + The structure of written results is configurable (`result`). + + """ + # determine device subnet + try: + iface_addrs = netifaces.ifaddresses(params.iface) + except ValueError as exc: + # invalid interface name + task.log.critical(iface=params.iface, + error=str(exc)) + return task.status.os_error try: - subnet_res = sp.run(route_cmd, capture_output=True, shell=True, check=True) - except sp.CalledProcessError as err: - stderr_res = {"exit_code": err.returncode, - "msg": err.stderr.decode('utf-8')} - json.dump(stderr_res, sys.stderr) - sys.exit(err.returncode) + ip_info = iface_addrs[netifaces.AF_INET][0] + except (KeyError, IndexError): + # bizarre interface + task.log.critical(iface=params.iface, + msg='could not locate internet address set') + return task.status.os_error - subnet = subnet_res.stdout.decode('utf-8').strip('\n') + ip_iface = ipaddress.ip_interface('{addr}/{netmask}'.format_map(ip_info)) - nmap_cmd = ['nmap', '-sn', subnet] + subnet = str(ip_iface.network) try: - sp.run(nmap_cmd, capture_output=True, check=True) - except sp.CalledProcessError as err: - stderr_res = {"exit_code": err.returncode, - "msg": err.stderr.decode('utf-8')} - json.dump(stderr_res, sys.stderr) - sys.exit(err.returncode) - - arp_cmd = (f"/usr/sbin/arp -i {params['iface']} -n | grep : |" - "grep -v '_gateway' | tr -s ' ' | " - "cut -f3 -d' ' | sort | uniq") - - # Run ARP to count devices + subprocess.run( + ( + nmap, + '-sn', # no port scan + subnet, + ), + # note: we don't actually want output -- unless there's an error + capture_output=True, + check=True, + text=True, + ) + except subprocess.CalledProcessError as exc: + # nmap shouldn't really error this way: this is serious + task.log.critical( + dest=subnet, + status=f'Error ({exc.returncode})', + args=exc.cmd, + stdout=exc.stdout, + stderr=exc.stderr, + ) + return task.status.software_error + try: - arp_res = sp.run(arp_cmd, capture_output=True, check=True) - except sp.CalledProcessError as err: - stderr_res = {"exit_code": err.returncode, - "msg": err.stderr.decode('utf-8')} - json.dump(stderr_res, sys.stderr) - sys.exit(err.returncode) + process = subprocess.run( + ( + arp, + '-e', # attempt to ensure Linux format + '--numeric', + '--device', params.iface, + ), + capture_output=True, + check=True, + text=True, + ) + except subprocess.CalledProcessError as exc: + # arp shouldn't really error this way: this is serious + task.log.critical( + iface=params.iface, + status=f'Error ({exc.returncode})', + args=exc.cmd, + stdout=exc.stdout, + stderr=exc.stderr, + ) + return task.status.software_error + + arp_results = ArpResult.parse(process.stdout) + + devices = {arp_result.hwaddress for arp_result in arp_results + if arp_result.address != '_gateway' and arp_result.hwaddress != params.iface} + + task.log.info(count=len(devices)) + + # persist state + store = DeviceStore.read() + + task.log.debug(state0=store) + + store.record(*devices) + + task.log.debug(state1=store) + + store.write() + + # write results + results = { + 'active': len(devices), + 'total': len(store), + '1day': store.count(timedelta(days=1)), + '1week': store.count(timedelta(days=7)), + } + + if params.result.flat: + results = {f'devices_{feature}': value for (feature, value) in results.items()} + else: + results = {'devices': results} + + task.result.write(results, + label=params.result.label, + annotate=params.result.annotate) + + return task.status.success + + +class ArpResult(typing.NamedTuple): + """Connect device record retrieved from ARP results.""" + + address: str + hwtype: str + hwaddress: str + + @classonlymethod + def parse(cls, output): + """Construct instances of ArpResult from ARP results text.""" + lines = output.splitlines() + + if lines[0].lower().startswith('address'): + del lines[0] + + return [cls.extract(line) for line in lines] + + @classonlymethod + def extract(cls, line): + """Construct an instance of ArpResult from a single line of + ARP results text. + + """ + items = line.split() + return cls._make(items[:3]) + + +class DeviceStore(dict): + """Mapping of device MAC addresses to the timestamps of their most + recent detection. + + Historical data are read from, updated and written to the task + framework's state record. + + """ + @classonlymethod + def read(cls): + return cls(task.state.read() or {}) + + def write(self): + task.state.write(self) + + def record(self, *devices, ts=None): + if ts is None: + ts = time.time() + elif isinstance(ts, datetime): + ts = ts.timestamp() + + if isinstance(ts, float): + ts = int(ts) + + if not isinstance(ts, int): + raise TypeError(f"timestamp argument 'ts' must be datetime or numeric, not " + f"'{ts.__class__.__name__}'") + + for device in devices: + self[device] = ts + + def query(self, span, before=None): + if isinstance(span, timedelta): + span = span.total_seconds() + + if not isinstance(span, (int, float)): + raise TypeError(f"time span argument 'span' must be timedelta or numeric, " + f"not '{span.__class__.__name__}'") - # Parse arp output - stdout_res = parse_arp_output(arp_res) - stderr_res = {"exit_code": arp_res.returncode, - "msg": arp_res.stderr.decode('utf-8')} + if before is None: + before = time.time() + elif isinstance(before, datetime): + before = before.timestamp() - json.dump(stdout_res, sys.stdout) - json.dump(stderr_res, sys.stderr) + if not isinstance(before, (int, float)): + raise TypeError(f"timestamp argument 'before' must be datetime or numeric, " + f"not '{before.__class__.__name__}'") - sys.exit(0) + since = before - span + for (device, last_seen) in self.items(): + if before >= last_seen > since: + yield device -if __name__ == '__main__': - main() + def count(self, span, before=None): + return sum(1 for _device in self.query(span, before)) diff --git a/src/netrics/task/__init__.py b/src/netrics/task/__init__.py index 35f08d5..80007f3 100644 --- a/src/netrics/task/__init__.py +++ b/src/netrics/task/__init__.py @@ -1,4 +1,4 @@ -from fate.task import log # noqa: F401 +from fate.task import log, state # noqa: F401 from . import ( # noqa: F401 param,