Skip to content

Commit

Permalink
driver: add RawNetworkInterfaceDriver
Browse files Browse the repository at this point in the history
This driver allows "raw" control of a network interface (such as Ethernet
or WiFi).

Signed-off-by: Bastian Krause <[email protected]>
Signed-off-by: Rouven Czerwinski <[email protected]>
  • Loading branch information
Bastian-Krause committed Oct 11, 2023
1 parent 9b6cc95 commit 9d7a8e0
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 0 deletions.
33 changes: 33 additions & 0 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2887,6 +2887,39 @@ It supports:
- connection sharing (DHCP server with NAT)
- listing DHCP leases (if the client has sufficient permissions)

Binds to:
iface:
- `NetworkInterface`_
- `USBNetworkInterface`_
- `RemoteNetworkInterface`_

Implements:
- None yet

Arguments:
- None

RawNetworkInterfaceDriver
~~~~~~~~~~~~~~~~~~~~~~~~~
This driver allows "raw" control of a network interface (such as Ethernet or
WiFi).

The labgrid-raw-interface helper (``helpers/labgrid-raw-interface``) needs to
be installed in the PATH and usable via sudo without password.
A configuration file ``/etc/labgrid/helpers.yaml`` should be installed on hosts
exporting network interfaces for the RawNetworkInterfaceDriver, e.g.:

.. code-block:: yaml
raw-interface:
denied-interfaces:
- eth1
It supports:
- recording traffic
- replaying traffic
- basic statistic collection

Binds to:
iface:
- `NetworkInterface`_
Expand Down
1 change: 1 addition & 0 deletions labgrid/driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from .httpvideodriver import HTTPVideoDriver
from .networkinterfacedriver import NetworkInterfaceDriver
from .provider import HTTPProviderDriver, NFSPProviderDriver, TFTPProviderDriver
from .rawnetworkinterfacedriver import RawNetworkInterfaceDriver
from .mqtt import TasmotaPowerDriver
from .manualswitchdriver import ManualSwitchDriver
from .usbtmcdriver import USBTMCDriver
Expand Down
90 changes: 90 additions & 0 deletions labgrid/driver/rawnetworkinterfacedriver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# pylint: disable=no-member
import json
import signal
import subprocess
import shutil
import tempfile

import attr

from .common import Driver
from ..factory import target_factory
from ..step import step
from ..util.helper import processwrapper
from ..util.managedfile import ManagedFile
from ..resource.common import NetworkResource


@target_factory.reg_driver
@attr.s(eq=False)
class RawNetworkInterfaceDriver(Driver):
bindings = {
"iface": {"NetworkInterface", "RemoteNetworkInterface", "USBNetworkInterface"},
}

def __attrs_post_init__(self):
super().__attrs_post_init__()
self.proc_pcap_map = {}

def _get_wrapper_prefix(self):
return self.iface.command_prefix + ["sudo", "labgrid-raw-interface"]

@Driver.check_active
@step(args=["count"])
def record(self, *, count=None):
capture = tempfile.NamedTemporaryFile(prefix="lg-raw-int-cap-")
cmd = self._get_wrapper_prefix() + ["tcpdump", self.iface.ifname]
if count is not None:
cmd.append(str(count))

proc = subprocess.Popen(cmd, stdout=capture, stderr=subprocess.PIPE)
self.proc_pcap_map[proc] = capture
return proc

@Driver.check_active
@step(args=["proc", "filename"])
def get_record(self, proc, filename, *, timeout=None):
assert proc in self.proc_pcap_map

try:
_, err = proc.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
proc.send_signal(signal.SIGINT)
_, err = proc.communicate()

assert proc.returncode == 0, f"tcpdump err={err}"

capfile = self.proc_pcap_map[proc]
shutil.copy(capfile.name, filename)
capfile.close()

@Driver.check_active
@step(args=["filename"])
def start_replay(self, filename):
mf = ManagedFile(filename, self.iface)
mf.sync_to_resource()

if isinstance(self.iface, NetworkResource):
cmd = self.iface.command_prefix
cmd.append(f"sudo labgrid-raw-interface tcpreplay {self.iface.ifname} < {mf.get_remote_path()}")
indata = None
else:
cmd = self._get_wrapper_prefix() + ["tcpreplay", self.iface.ifname]
indata = open(mf.get_remote_path(), "rb").read()
return subprocess.run(cmd, input=indata)

@Driver.check_active
@step()
def get_statistics(self):
cmd = self.iface.command_prefix + [
"ip",
"--json",
"-stats", "-stats",
"link", "show",
self.iface.ifname]
output = processwrapper.check_output(cmd)
return json.loads(output)[0]

@Driver.check_active
def get_address(self):
return self.get_statistics()["address"]

0 comments on commit 9d7a8e0

Please sign in to comment.