Skip to content

Commit

Permalink
examples: add network test using RawNetworkDriver
Browse files Browse the repository at this point in the history
Generates an Ethernet frame via scapy using pcap, copies pcap to DUT,
replays pcap on interface, records frame locally (or on exporter, adjust
env.yaml accordingly), and compares both.

Signed-off-by: Bastian Krause <[email protected]>
  • Loading branch information
Bastian-Krause committed Oct 11, 2023
1 parent 9d7a8e0 commit 8d62d81
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 0 deletions.
14 changes: 14 additions & 0 deletions examples/network-test/env.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
targets:
main:
resources:
NetworkService:
address: 192.168.1.5
username: root
NetworkInterface:
ifname: enp2s0f3
drivers:
SSHDriver: {}
RawNetworkInterfaceDriver: {}
options:
local_iface_to_dut_iface:
enp2s0f3: uplink
53 changes: 53 additions & 0 deletions examples/network-test/pkg-replay-record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python3
# Generates an Ethernet frame via scapy using pcap, copies pcap to DUT, replays pcap on interface,
# records frame locally (or on exporter, adjust env.yaml accordingly), and compares both.

import logging
from tempfile import NamedTemporaryFile, TemporaryDirectory

from labgrid import Environment
from labgrid.logging import basicConfig, StepLogger
from scapy.all import Ether, Raw, rdpcap, wrpcap, conf

def generate_frame():
frame = [Ether(dst="11:22:33:44:55:66", src="66:55:44:33:22:11", type=0x9000)]
padding = "\x00" * (conf.min_pkt_size - len(frame))
frame = frame[0] / Raw(load=padding)
return frame


basicConfig(level=logging.DEBUG)
StepLogger.start()
env = Environment("env.yaml")
target = env.get_target()

netdrv = target.get_driver("RawNetworkInterfaceDriver")
ssh = target.get_driver("SSHDriver")

# interface names
exporter_iface = netdrv.iface.ifname
dut_iface = env.config.get_target_option(target.name, "local_iface_to_dut_iface")[exporter_iface]

# generate test frame
generated_frame = generate_frame()

# write pcap, copy to DUT
remote_pcap = "/tmp/pcap"
with NamedTemporaryFile() as pcap:
wrpcap(pcap.name, generated_frame)
ssh.put(pcap.name, remote_pcap)

# start record on exporter
ethers = filter(lambda frame: isinstance(frame, Ether), generated_frame)
cap_proc = netdrv.record(count=len(list(ethers)))

# replay pcap on DUT
ssh.run_check(f"ip link set {dut_iface} up")
ssh.run_check(f"tcpreplay -i {dut_iface} {remote_pcap}")

# copy recorded pcap from DUT, compare with generated frame
with TemporaryDirectory() as tempdir:
tempf = f"{tempdir}/record.pcap"
netdrv.get_record(cap_proc, tempf)
remote_frame = rdpcap(tempf)
assert remote_frame[0] == generated_frame[0]

0 comments on commit 8d62d81

Please sign in to comment.