Skip to content

Commit

Permalink
measurement: dev: report (and record) network-connected devices
Browse files Browse the repository at this point in the history
Adds measurement `dev` (executable `netrics-dev`) to scan local network
and report on the number of connected devices.

Notably, this change implements Fate task state persistence, applied to
this measurement.

part of #3

part of #4

resolves #29
  • Loading branch information
jesteria committed Apr 12, 2023
1 parent dace07f commit 6d7395c
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 59 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ packages = [{include = "netrics", from = "src"}]

[tool.poetry.dependencies]
python = "^3.8"
fate-scheduler = "0.1.0-rc.8"
fate-scheduler = "0.1.0-rc.9"
netifaces = "^0.11.0"

[tool.poetry.dev-dependencies]
Expand Down
3 changes: 3 additions & 0 deletions src/netrics/conf/include/measurements.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
dev:
schedule: "H/5 * * * *"

dns-latency:
schedule: "H/5 * * * *"
param:
Expand Down
270 changes: 213 additions & 57 deletions src/netrics/measurement/dev.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,235 @@
import subprocess as sp
import json
import sys
"""Count devices connected to the local network."""
import ipaddress
import subprocess
import time
import typing
from datetime import datetime, timedelta

PARAM_DEFAULTS = {"iface": "eth0"}
import netifaces
from descriptors import classonlymethod
from schema import Optional

from netrics import task

from .common import require_exec, require_lan


PARAMS = task.schema.extend('connected_devices_arp', {
Optional('iface', default='eth0'): task.schema.Text,
})

def parse_arp_output(out):
"""
Parses arp output and returns results
"""
res = {}

ts = int(time.time())
devices = set(out.decode('utf-8').strip().split('\n'))
@task.param.require(PARAMS)
@require_exec('nmap', 'arp')
@require_lan
def main(nmap, arp, params):
"""Count devices connected to the local network.
# Parsing
res['n_devs'] = len(devices)
res['devs'] = []
for dev in devices:
res['devs'].append({'name': dev, 'ts': ts})
The local network is queried first to ensure operation.
(See: `require_lan`.)
return res
nmap and arp are then executed to detect devices connected to the
local network. The network interface to query may be configured
(`iface`).
def main():
Devices are recorded by MAC address, their most recent timestamp of
detection persisted to task state.
params = dict(PARAM_DEFAULTS, **json.load(sys.stdin))
Written results consist of the aggregate count of connected devices:
stdout_res = {}
* currently
* for all time
* over 24 hours
* over 7 days
# Get local subnet
route_cmd = f"ip r | grep -v default | grep src | grep {params['iface']} | head -n 1 | awk '{{print $1;}}'"
The structure of written results is configurable (`result`).
"""
# determine device subnet
try:
iface_addrs = netifaces.ifaddresses(params.iface)
except ValueError as exc:
# invalid interface name
task.log.critical(iface=params.iface,
error=str(exc))
return task.status.os_error

try:
subnet_res = sp.run(route_cmd, capture_output=True, shell=True, check=True)
except sp.CalledProcessError as err:
stderr_res = {"exit_code": err.returncode,
"msg": err.stderr.decode('utf-8')}
json.dump(stderr_res, sys.stderr)
sys.exit(err.returncode)
ip_info = iface_addrs[netifaces.AF_INET][0]
except (KeyError, IndexError):
# bizarre interface
task.log.critical(iface=params.iface,
msg='could not locate internet address set')
return task.status.os_error

subnet = subnet_res.stdout.decode('utf-8').strip('\n')
ip_iface = ipaddress.ip_interface('{addr}/{netmask}'.format_map(ip_info))

nmap_cmd = ['nmap', '-sn', subnet]
subnet = str(ip_iface.network)

try:
sp.run(nmap_cmd, capture_output=True, check=True)
except sp.CalledProcessError as err:
stderr_res = {"exit_code": err.returncode,
"msg": err.stderr.decode('utf-8')}
json.dump(stderr_res, sys.stderr)
sys.exit(err.returncode)

arp_cmd = (f"/usr/sbin/arp -i {params['iface']} -n | grep : |"
"grep -v '_gateway' | tr -s ' ' | "
"cut -f3 -d' ' | sort | uniq")

# Run ARP to count devices
subprocess.run(
(
nmap,
'-sn', # no port scan
subnet,
),
# note: we don't actually want output -- unless there's an error
capture_output=True,
check=True,
text=True,
)
except subprocess.CalledProcessError as exc:
# nmap shouldn't really error this way: this is serious
task.log.critical(
dest=subnet,
status=f'Error ({exc.returncode})',
args=exc.cmd,
stdout=exc.stdout,
stderr=exc.stderr,
)
return task.status.software_error

try:
arp_res = sp.run(arp_cmd, capture_output=True, check=True)
except sp.CalledProcessError as err:
stderr_res = {"exit_code": err.returncode,
"msg": err.stderr.decode('utf-8')}
json.dump(stderr_res, sys.stderr)
sys.exit(err.returncode)
process = subprocess.run(
(
arp,
'-e', # attempt to ensure Linux format
'--numeric',
'--device', params.iface,
),
capture_output=True,
check=True,
text=True,
)
except subprocess.CalledProcessError as exc:
# arp shouldn't really error this way: this is serious
task.log.critical(
iface=params.iface,
status=f'Error ({exc.returncode})',
args=exc.cmd,
stdout=exc.stdout,
stderr=exc.stderr,
)
return task.status.software_error

arp_results = ArpResult.parse(process.stdout)

devices = {arp_result.hwaddress for arp_result in arp_results
if arp_result.address != '_gateway' and arp_result.hwaddress != params.iface}

task.log.info(count=len(devices))

# persist state
store = DeviceStore.read()

task.log.debug(state0=store)

store.record(*devices)

task.log.debug(state1=store)

store.write()

# write results
results = {
'active': len(devices),
'total': len(store),
'1day': store.count(timedelta(days=1)),
'1week': store.count(timedelta(days=7)),
}

if params.result.flat:
results = {f'devices_{feature}': value for (feature, value) in results.items()}
else:
results = {'devices': results}

task.result.write(results,
label=params.result.label,
annotate=params.result.annotate)

return task.status.success


class ArpResult(typing.NamedTuple):
"""Connect device record retrieved from ARP results."""

address: str
hwtype: str
hwaddress: str

@classonlymethod
def parse(cls, output):
"""Construct instances of ArpResult from ARP results text."""
lines = output.splitlines()

if lines[0].lower().startswith('address'):
del lines[0]

return [cls.extract(line) for line in lines]

@classonlymethod
def extract(cls, line):
"""Construct an instance of ArpResult from a single line of
ARP results text.
"""
items = line.split()
return cls._make(items[:3])


class DeviceStore(dict):
"""Mapping of device MAC addresses to the timestamps of their most
recent detection.
Historical data are read from, updated and written to the task
framework's state record.
"""
@classonlymethod
def read(cls):
return cls(task.state.read() or {})

def write(self):
task.state.write(self)

def record(self, *devices, ts=None):
if ts is None:
ts = time.time()
elif isinstance(ts, datetime):
ts = ts.timestamp()

if isinstance(ts, float):
ts = int(ts)

if not isinstance(ts, int):
raise TypeError(f"timestamp argument 'ts' must be datetime or numeric, not "
f"'{ts.__class__.__name__}'")

for device in devices:
self[device] = ts

def query(self, span, before=None):
if isinstance(span, timedelta):
span = span.total_seconds()

if not isinstance(span, (int, float)):
raise TypeError(f"time span argument 'span' must be timedelta or numeric, "
f"not '{span.__class__.__name__}'")

# Parse arp output
stdout_res = parse_arp_output(arp_res)
stderr_res = {"exit_code": arp_res.returncode,
"msg": arp_res.stderr.decode('utf-8')}
if before is None:
before = time.time()
elif isinstance(before, datetime):
before = before.timestamp()

json.dump(stdout_res, sys.stdout)
json.dump(stderr_res, sys.stderr)
if not isinstance(before, (int, float)):
raise TypeError(f"timestamp argument 'before' must be datetime or numeric, "
f"not '{before.__class__.__name__}'")

sys.exit(0)
since = before - span

for (device, last_seen) in self.items():
if before >= last_seen > since:
yield device

if __name__ == '__main__':
main()
def count(self, span, before=None):
return sum(1 for _device in self.query(span, before))
2 changes: 1 addition & 1 deletion src/netrics/task/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from fate.task import log # noqa: F401
from fate.task import log, state # noqa: F401

from . import ( # noqa: F401
param,
Expand Down

0 comments on commit 6d7395c

Please sign in to comment.