From e3b03a06e4d59ea8827b76ebf9f886c55968a9f0 Mon Sep 17 00:00:00 2001 From: Sebastian Majewski Date: Fri, 10 Nov 2023 22:38:47 -0600 Subject: [PATCH] Refactored ARP related code. --- .gitignore | 1 - .vscode/settings.json | 5 +- pytcp/protocols/arp/fpa.py | 41 ++++--- pytcp/protocols/arp/fpp.py | 88 ++++++++++----- pytcp/protocols/arp/phrx.py | 176 ++++++++++++++++------------- pytcp/protocols/arp/phtx.py | 14 +-- pytcp/protocols/arp/ps.py | 13 ++- pytcp/subsystems/arp_cache.py | 4 +- pytcp/subsystems/packet_handler.py | 8 +- tests/unit/protocols__arp__fpa.py | 20 ++-- tests/unit/protocols__arp__phtx.py | 6 +- 11 files changed, 222 insertions(+), 154 deletions(-) diff --git a/.gitignore b/.gitignore index 1b057405..c18dd8d8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1 @@ - __pycache__/ diff --git a/.vscode/settings.json b/.vscode/settings.json index 49d4639f..176ca276 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,5 +2,8 @@ "python.analysis.extraPaths": [ "./pytcp" ], - "python.formatting.provider": "black" + "python.formatting.provider": "none", + "[python]": { + "editor.defaultFormatter": "ms-python.black-formatter" + } } \ No newline at end of file diff --git a/pytcp/protocols/arp/fpa.py b/pytcp/protocols/arp/fpa.py index 5cc0cef5..5fd2bd6b 100755 --- a/pytcp/protocols/arp/fpa.py +++ b/pytcp/protocols/arp/fpa.py @@ -41,7 +41,7 @@ from pytcp.lib.ip4_address import Ip4Address from pytcp.lib.mac_address import MacAddress from pytcp.lib.tracker import Tracker -from pytcp.protocols.arp.ps import ARP_HEADER_LEN, ARP_OP_REPLY, ARP_OP_REQUEST +from pytcp.protocols.arp.ps import ARP_HEADER_LEN, ArpOperation from pytcp.protocols.ether.ps import ETHER_TYPE_ARP @@ -59,14 +59,17 @@ def __init__( spa: Ip4Address = Ip4Address(0), tha: MacAddress = MacAddress(0), tpa: Ip4Address = Ip4Address(0), - oper: int = ARP_OP_REQUEST, + oper: ArpOperation = ArpOperation.ARP_OP_REQUEST, echo_tracker: Tracker | None = None, ) -> None: """ Class constructor. """ - assert oper in (ARP_OP_REQUEST, ARP_OP_REPLY), f"{oper=}" + assert oper in ( + ArpOperation.ARP_OP_REQUEST, + ArpOperation.ARP_OP_REPLY, + ), f"{oper=}" self._tracker = Tracker(prefix="TX", echo_tracker=echo_tracker) @@ -74,7 +77,7 @@ def __init__( self._prtype: int = 0x0800 self._hrlen: int = 6 self._prlen: int = 4 - self._oper: int = oper + self._oper: ArpOperation = oper self._sha: MacAddress = sha self._spa: Ip4Address = spa self._tha: MacAddress = tha @@ -84,35 +87,41 @@ def __len__(self) -> int: """ Length of the packet. """ + return ARP_HEADER_LEN def __str__(self) -> str: """ Packet log string. """ - if self._oper == ARP_OP_REQUEST: - return ( - f"ARP request {self._spa} / {self._sha}" - f" > {self._tpa} / {self._tha}" - ) - if self._oper == ARP_OP_REPLY: - return ( - f"ARP reply {self._spa} / {self._sha}" - f" > {self._tpa} / {self._tha}" - ) - return f"ARP request unknown operation {self._oper}" + + match self._oper: + case ArpOperation.ARP_OP_REQUEST: + return ( + f"ARP request {self._spa} / {self._sha}" + f" > {self._tpa} / {self._tha}" + ) + case ArpOperation.ARP_OP_REPLY: + return ( + f"ARP reply {self._spa} / {self._sha}" + f" > {self._tpa} / {self._tha}" + ) + case _: + return f"ARP request unknown operation {self._oper}" @property def tracker(self) -> Tracker: """ Getter for the '_tracker' property. """ + return self._tracker def assemble(self, frame: memoryview) -> None: """ Assemble packet into the raw form. """ + struct.pack_into( "!HH BBH 6s 4s 6s 4s", frame, @@ -121,7 +130,7 @@ def assemble(self, frame: memoryview) -> None: self._prtype, self._hrlen, self._prlen, - self._oper, + self._oper.value, bytes(self._sha), bytes(self._spa), bytes(self._tha), diff --git a/pytcp/protocols/arp/fpp.py b/pytcp/protocols/arp/fpp.py index a572c225..56445f22 100755 --- a/pytcp/protocols/arp/fpp.py +++ b/pytcp/protocols/arp/fpp.py @@ -44,7 +44,14 @@ from pytcp import config from pytcp.lib.ip4_address import Ip4Address from pytcp.lib.mac_address import MacAddress -from pytcp.protocols.arp.ps import ARP_HEADER_LEN, ARP_OP_REPLY, ARP_OP_REQUEST +from pytcp.protocols.arp.ps import ( + ARP_HEADER_LEN, + ARP_HRLEN, + ARP_HRTYPE, + ARP_PRLEN, + ARP_PRTYPE, + ArpOperation, +) if TYPE_CHECKING: from pytcp.lib.packet import PacketRx @@ -72,31 +79,37 @@ def __len__(self) -> int: """ Number of bytes remaining in the frame. """ + return len(self._frame) def __str__(self) -> str: """ Packet log string. """ - if self.oper == ARP_OP_REQUEST: - return ( - f"ARP request {self.spa} / {self.sha}" - f" > {self.tpa} / {self.tha}" - ) - if self.oper == ARP_OP_REPLY: - return ( - f"ARP reply {self.spa} / {self.sha}" - f" > {self.tpa} / {self.tha}" - ) - return f"ARP request unknown operation {self.oper}" + + match self.oper: + case ArpOperation.ARP_OP_REQUEST: + return ( + f"ARP request {self.spa} / {self.sha}" + f" > {self.tpa} / {self.tha}" + ) + case ArpOperation.ARP_OP_REPLY: + return ( + f"ARP reply {self.spa} / {self.sha}" + f" > {self.tpa} / {self.tha}" + ) + case _: + return f"ARP request unknown operation {self.oper}" @property def hrtype(self) -> int: """ Read the 'Hardware address type' field. """ + if "_cache__hrtype" not in self.__dict__: self._cache__hrtype: int = struct.unpack("!H", self._frame[0:2])[0] + return self._cache__hrtype @property @@ -104,8 +117,10 @@ def prtype(self) -> int: """ Read the 'Protocol address type' field. """ + if "_cache__prtype" not in self.__dict__: self._cache__prtype: int = struct.unpack("!H", self._frame[2:4])[0] + return self._cache__prtype @property @@ -113,6 +128,7 @@ def hrlen(self) -> int: """ Read the 'Hardware address length' field. """ + return self._frame[4] @property @@ -120,15 +136,20 @@ def prlen(self) -> int: """ Read the 'Protocol address length' field. """ + return self._frame[5] @property - def oper(self) -> int: + def oper(self) -> ArpOperation: """ Read the 'Operation' field. """ + if "_cache__oper" not in self.__dict__: - self._cache__oper: int = struct.unpack("!H", self._frame[6:8])[0] + self._cache__oper = ArpOperation( + struct.unpack("!H", self._frame[6:8])[0] + ) + return self._cache__oper @property @@ -136,8 +157,10 @@ def sha(self) -> MacAddress: """ Read the 'Sender hardware address' field. """ + if "_cache__sha" not in self.__dict__: self._cache__sha = MacAddress(self._frame[8:14]) + return self._cache__sha @property @@ -145,8 +168,10 @@ def spa(self) -> Ip4Address: """ Read the 'Sender protocol address' field. """ + if "_cache__spa" not in self.__dict__: self._cache__spa = Ip4Address(self._frame[14:18]) + return self._cache__spa @property @@ -154,8 +179,10 @@ def tha(self) -> MacAddress: """ Read the 'Target hardware address' field. """ + if "_cache__tha" not in self.__dict__: self._cache__tha = MacAddress(self._frame[18:24]) + return self._cache__tha @property @@ -163,8 +190,10 @@ def tpa(self) -> Ip4Address: """ Read the 'Target protocol address' field. """ + if "_cache__tpa" not in self.__dict__: self._cache__tpa = Ip4Address(self._frame[24:28]) + return self._cache__tpa @property @@ -172,8 +201,10 @@ def packet_copy(self) -> bytes: """ Read the whole packet. """ + if "_cache__packet_copy" not in self.__dict__: self._cache__packet_copy = bytes(self._frame[:ARP_HEADER_LEN]) + return self._cache__packet_copy def _packet_integrity_check(self) -> str: @@ -186,32 +217,35 @@ def _packet_integrity_check(self) -> str: return "" if len(self) < ARP_HEADER_LEN: - return "ARP integrity - wrong packet length (I)" + return f"ARP integrity - wrong packet length, {len(self)} bytes." return "" def _packet_sanity_check(self) -> str: """ - Packet sanity check to be run on parsed packet to make sure packet's - fields contain sane values + Packet sanity check to be run on parsed packet to make sure + packet's fields contain sane values """ if not config.PACKET_SANITY_CHECK: return "" - if self.hrtype != 1: - return "ARP sanity - 'arp_hrtype' must be 1" + if self.hrtype != ARP_HRTYPE: + return f"ARP sanity - 'arp_hrtype' must be {ARP_HRTYPE}" - if self.prtype != 0x0800: - return "ARP sanity - 'arp_prtype' must be 0x0800" + if self.prtype != ARP_PRTYPE: + return f"ARP sanity - 'arp_prtype' must be {ARP_PRTYPE}" - if self.hrlen != 6: - return "ARP sanity - 'arp_hrlen' must be 6" + if self.hrlen != ARP_HRLEN: + return f"ARP sanity - 'arp_hrlen' must be {ARP_HRLEN}" - if self.prlen != 4: - return "ARP sanity - 'arp_prlen' must be 4" + if self.prlen != ARP_PRLEN: + return f"ARP sanity - 'arp_prlen' must be {ARP_PRLEN}" - if self.oper not in {1, 2}: - return "ARP sanity - 'oper' must be [1-2]" + if self.oper not in ArpOperation: + return ( + f"ARP sanity - 'oper' must be " + f"{[oper.value for oper in ArpOperation]}" + ) return "" diff --git a/pytcp/protocols/arp/phrx.py b/pytcp/protocols/arp/phrx.py index 9ed0e8fc..bf1db893 100755 --- a/pytcp/protocols/arp/phrx.py +++ b/pytcp/protocols/arp/phrx.py @@ -46,7 +46,7 @@ from pytcp.lib import stack from pytcp.lib.logger import log from pytcp.protocols.arp.fpp import ArpParser -from pytcp.protocols.arp.ps import ARP_OP_REPLY, ARP_OP_REQUEST +from pytcp.protocols.arp.ps import ArpOperation if TYPE_CHECKING: from pytcp.lib.packet import PacketRx @@ -72,98 +72,112 @@ def _phrx_arp(self: PacketHandler, packet_rx: PacketRx) -> None: __debug__ and log("arp", f"{packet_rx.tracker} - {packet_rx.arp}") - if packet_rx.arp.oper == ARP_OP_REQUEST: - self.packet_stats_rx.arp__op_request += 1 - # Check if request contains our IP address in SPA field, - # this indicates IP address conflict - if packet_rx.arp.spa in self.ip4_unicast: - self.packet_stats_rx.arp__op_request__ip_conflict += 1 - __debug__ and log( - "arp", - f"{packet_rx.tracker} - IP ({packet_rx.arp.spa}) " - f"conflict detected with host at {packet_rx.arp.sha}", - ) - return + match packet_rx.arp.oper: + case ArpOperation.ARP_OP_REQUEST: + _phrx_arp__request(self, packet_rx) + case ArpOperation.ARP_OP_REPLY: + _phrx_arp__reply(self, packet_rx) - # Check if the request is for one of our IP addresses, - # if so the craft ARP reply packet and send it out - if packet_rx.arp.tpa in self.ip4_unicast: - self.packet_stats_rx.arp__op_request__tpa_stack__respond += 1 - self._phtx_arp( - ether_src=self.mac_unicast, - ether_dst=packet_rx.arp.sha, - arp_oper=ARP_OP_REPLY, - arp_sha=self.mac_unicast, - arp_spa=packet_rx.arp.tpa, - arp_tha=packet_rx.arp.sha, - arp_tpa=packet_rx.arp.spa, - echo_tracker=packet_rx.tracker, - ) - # Update ARP cache with the mapping learned from the received - # ARP request that was destined to this stack - if config.ARP_CACHE_UPDATE_FROM_DIRECT_REQUEST: - self.packet_stats_rx.arp__op_request__update_arp_cache += 1 - __debug__ and log( - "arp", - f"{packet_rx.tracker} - Adding/refreshing " - "ARP cache entry from direct request " - f"- {packet_rx.arp.spa} -> {packet_rx.arp.sha}", - ) - stack.arp_cache.add_entry(packet_rx.arp.spa, packet_rx.arp.sha) - return +def _phrx_arp__request(self: PacketHandler, packet_rx: PacketRx) -> None: + """ + Handle inbound ARP request packets. + """ - else: - # Drop packet if TPA does not match one of our IP addresses - self.packet_stats_rx.arp__op_request__tpa_unknown__drop += 1 - return + self.packet_stats_rx.arp__op_request += 1 + # Check if request contains our IP address in SPA field, + # this indicates IP address conflict + if packet_rx.arp.spa in self.ip4_unicast: + self.packet_stats_rx.arp__op_request__ip_conflict += 1 + __debug__ and log( + "arp", + f"{packet_rx.tracker} - IP ({packet_rx.arp.spa}) " + f"conflict detected with host at {packet_rx.arp.sha}", + ) + return - # Handle ARP reply - elif packet_rx.arp.oper == ARP_OP_REPLY: - self.packet_stats_rx.arp__op_reply += 1 - # Check for ARP reply that is response to our ARP probe, this indicates - # the IP address we trying to claim is in use - if packet_rx.ether.dst == self.mac_unicast: - if ( - packet_rx.arp.spa - in [_.address for _ in self.ip4_host_candidate] - and packet_rx.arp.tha == self.mac_unicast - and packet_rx.arp.tpa.is_unspecified - ): - self.packet_stats_rx.arp__op_reply__ip_conflict += 1 - __debug__ and log( - "arp", - f"{packet_rx.tracker} - ARP Probe detected " - f"conflict for IP {packet_rx.arp.spa} with host at " - f"{packet_rx.arp.sha}", - ) - stack.arp_probe_unicast_conflict.add(packet_rx.arp.spa) - return - - # Update ARP cache with mapping received as direct ARP reply - if packet_rx.ether.dst == self.mac_unicast: - self.packet_stats_rx.arp__op_reply__update_arp_cache += 1 + # Check if the request is for one of our IP addresses, + # if so the craft ARP reply packet and send it out + if packet_rx.arp.tpa in self.ip4_unicast: + self.packet_stats_rx.arp__op_request__tpa_stack__respond += 1 + self._phtx_arp( + ether_src=self.mac_unicast, + ether_dst=packet_rx.arp.sha, + arp_oper=ArpOperation.ARP_OP_REPLY, + arp_sha=self.mac_unicast, + arp_spa=packet_rx.arp.tpa, + arp_tha=packet_rx.arp.sha, + arp_tpa=packet_rx.arp.spa, + echo_tracker=packet_rx.tracker, + ) + + # Update ARP cache with the mapping learned from the received + # ARP request that was destined to this stack + if config.ARP_CACHE_UPDATE_FROM_DIRECT_REQUEST: + self.packet_stats_rx.arp__op_request__update_arp_cache += 1 __debug__ and log( "arp", - f"{packet_rx.tracker} - Adding/refreshing ARP cache entry " - f"from direct reply - {packet_rx.arp.spa} " - f"-> {packet_rx.arp.sha}", + f"{packet_rx.tracker} - Adding/refreshing " + "ARP cache entry from direct request " + f"- {packet_rx.arp.spa} -> {packet_rx.arp.sha}", ) stack.arp_cache.add_entry(packet_rx.arp.spa, packet_rx.arp.sha) - return + return - # Update ARP cache with mapping received as gratuitous ARP reply + else: + # Drop packet if TPA does not match one of our IP addresses + self.packet_stats_rx.arp__op_request__tpa_unknown__drop += 1 + return + + +def _phrx_arp__reply(self: PacketHandler, packet_rx: PacketRx) -> None: + """ + Handle inbound ARP reply packets. + """ + + self.packet_stats_rx.arp__op_reply += 1 + # Check for ARP reply that is response to our ARP probe, this indicates + # the IP address we trying to claim is in use + if packet_rx.ether.dst == self.mac_unicast: if ( - packet_rx.ether.dst.is_broadcast - and packet_rx.arp.spa == packet_rx.arp.tpa - and config.ARP_CACHE_UPDATE_FROM_GRATUITIOUS_REPLY + packet_rx.arp.spa in [_.address for _ in self.ip4_host_candidate] + and packet_rx.arp.tha == self.mac_unicast + and packet_rx.arp.tpa.is_unspecified ): - self.packet_stats_rx.arp__op_reply__update_arp_cache_gratuitous += 1 + self.packet_stats_rx.arp__op_reply__ip_conflict += 1 __debug__ and log( "arp", - f"{packet_rx.tracker} - Adding/refreshing ARP cache entry " - f"from gratuitous reply - {packet_rx.arp.spa} " - f"-> {packet_rx.arp.sha}", + f"{packet_rx.tracker} - ARP Probe detected " + f"conflict for IP {packet_rx.arp.spa} with host at " + f"{packet_rx.arp.sha}", ) - stack.arp_cache.add_entry(packet_rx.arp.spa, packet_rx.arp.sha) + stack.arp_probe_unicast_conflict.add(packet_rx.arp.spa) return + + # Update ARP cache with mapping received as direct ARP reply + if packet_rx.ether.dst == self.mac_unicast: + self.packet_stats_rx.arp__op_reply__update_arp_cache += 1 + __debug__ and log( + "arp", + f"{packet_rx.tracker} - Adding/refreshing ARP cache entry " + f"from direct reply - {packet_rx.arp.spa} " + f"-> {packet_rx.arp.sha}", + ) + stack.arp_cache.add_entry(packet_rx.arp.spa, packet_rx.arp.sha) + return + + # Update ARP cache with mapping received as gratuitous ARP reply + if ( + packet_rx.ether.dst.is_broadcast + and packet_rx.arp.spa == packet_rx.arp.tpa + and config.ARP_CACHE_UPDATE_FROM_GRATUITIOUS_REPLY + ): + self.packet_stats_rx.arp__op_reply__update_arp_cache_gratuitous += 1 + __debug__ and log( + "arp", + f"{packet_rx.tracker} - Adding/refreshing ARP cache entry " + f"from gratuitous reply - {packet_rx.arp.spa} " + f"-> {packet_rx.arp.sha}", + ) + stack.arp_cache.add_entry(packet_rx.arp.spa, packet_rx.arp.sha) + return diff --git a/pytcp/protocols/arp/phtx.py b/pytcp/protocols/arp/phtx.py index a2dc435d..2d5a6507 100755 --- a/pytcp/protocols/arp/phtx.py +++ b/pytcp/protocols/arp/phtx.py @@ -45,7 +45,7 @@ from pytcp.lib.tracker import Tracker from pytcp.lib.tx_status import TxStatus from pytcp.protocols.arp.fpa import ArpAssembler -from pytcp.protocols.arp.ps import ARP_OP_REPLY, ARP_OP_REQUEST +from pytcp.protocols.arp.ps import ArpOperation if TYPE_CHECKING: from pytcp.lib.ip4_address import Ip4Address @@ -57,7 +57,7 @@ def _phtx_arp( *, ether_src: MacAddress, ether_dst: MacAddress, - arp_oper: int, + arp_oper: ArpOperation, arp_sha: MacAddress, arp_spa: Ip4Address, arp_tha: MacAddress, @@ -76,11 +76,11 @@ def _phtx_arp( self.packet_stats_tx.arp__no_proto_support__drop += 1 return TxStatus.DROPED__ARP__NO_PROTOCOL_SUPPORT - if arp_oper == ARP_OP_REQUEST: - self.packet_stats_tx.arp__op_request__send += 1 - - if arp_oper == ARP_OP_REPLY: - self.packet_stats_tx.arp__op_reply__send += 1 + match arp_oper: + case ArpOperation.ARP_OP_REQUEST: + self.packet_stats_tx.arp__op_request__send += 1 + case ArpOperation.ARP_OP_REPLY: + self.packet_stats_tx.arp__op_reply__send += 1 arp_packet_tx = ArpAssembler( oper=arp_oper, diff --git a/pytcp/protocols/arp/ps.py b/pytcp/protocols/arp/ps.py index c268cd3e..3c09faf1 100755 --- a/pytcp/protocols/arp/ps.py +++ b/pytcp/protocols/arp/ps.py @@ -35,6 +35,8 @@ from __future__ import annotations +from enum import Enum + # ARP packet header - IPv4 stack version only # +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ @@ -56,5 +58,12 @@ ARP_HEADER_LEN = 28 -ARP_OP_REQUEST = 1 -ARP_OP_REPLY = 2 +ARP_HRTYPE = 1 +ARP_PRTYPE = 0x0800 +ARP_HRLEN = 6 +ARP_PRLEN = 4 + + +class ArpOperation(Enum): + ARP_OP_REQUEST = 1 + ARP_OP_REPLY = 2 diff --git a/pytcp/subsystems/arp_cache.py b/pytcp/subsystems/arp_cache.py index 8973c46e..ae6738c0 100755 --- a/pytcp/subsystems/arp_cache.py +++ b/pytcp/subsystems/arp_cache.py @@ -46,7 +46,7 @@ from pytcp.lib.ip4_address import Ip4Address from pytcp.lib.logger import log from pytcp.lib.mac_address import MacAddress -from pytcp.protocols.arp.ps import ARP_OP_REQUEST +from pytcp.protocols.arp.ps import ArpOperation class ArpCache: @@ -181,7 +181,7 @@ def _send_arp_request(self, arp_tpa: Ip4Address) -> None: stack.packet_handler._phtx_arp( ether_src=stack.packet_handler.mac_unicast, ether_dst=MacAddress(0xFFFFFFFFFFFF), - arp_oper=ARP_OP_REQUEST, + arp_oper=ArpOperation.ARP_OP_REQUEST, arp_sha=stack.packet_handler.mac_unicast, arp_spa=stack.packet_handler.ip4_unicast[0] if stack.packet_handler.ip4_unicast diff --git a/pytcp/subsystems/packet_handler.py b/pytcp/subsystems/packet_handler.py index 742ffc55..55abedb0 100755 --- a/pytcp/subsystems/packet_handler.py +++ b/pytcp/subsystems/packet_handler.py @@ -56,7 +56,7 @@ from pytcp.lib.packet_stats import PacketStatsRx, PacketStatsTx from pytcp.protocols.arp.phrx import _phrx_arp from pytcp.protocols.arp.phtx import _phtx_arp -from pytcp.protocols.arp.ps import ARP_OP_REPLY, ARP_OP_REQUEST +from pytcp.protocols.arp.ps import ArpOperation from pytcp.protocols.dhcp4.client import Dhcp4Client from pytcp.protocols.ether.phrx import _phrx_ether from pytcp.protocols.ether.phtx import _phtx_ether @@ -473,7 +473,7 @@ def _send_arp_probe(self, ip4_unicast: Ip4Address) -> None: self._phtx_arp( ether_src=self.mac_unicast, ether_dst=MacAddress(0xFFFFFFFFFFFF), - arp_oper=ARP_OP_REQUEST, + arp_oper=ArpOperation.ARP_OP_REQUEST, arp_sha=self.mac_unicast, arp_spa=Ip4Address(0), arp_tha=MacAddress(0), @@ -488,7 +488,7 @@ def _send_arp_announcement(self, ip4_unicast: Ip4Address) -> None: self._phtx_arp( ether_src=self.mac_unicast, ether_dst=MacAddress(0xFFFFFFFFFFFF), - arp_oper=ARP_OP_REQUEST, + arp_oper=ArpOperation.ARP_OP_REQUEST, arp_sha=self.mac_unicast, arp_spa=ip4_unicast, arp_tha=MacAddress(0), @@ -505,7 +505,7 @@ def _send_gratitous_arp(self, ip4_unicast: Ip4Address) -> None: self._phtx_arp( ether_src=self.mac_unicast, ether_dst=MacAddress(0xFFFFFFFFFFFF), - arp_oper=ARP_OP_REPLY, + arp_oper=ArpOperation.ARP_OP_REPLY, arp_sha=self.mac_unicast, arp_spa=ip4_unicast, arp_tha=MacAddress(0), diff --git a/tests/unit/protocols__arp__fpa.py b/tests/unit/protocols__arp__fpa.py index 32b9f053..2aee3e43 100755 --- a/tests/unit/protocols__arp__fpa.py +++ b/tests/unit/protocols__arp__fpa.py @@ -36,7 +36,7 @@ from pytcp.lib.ip4_address import Ip4Address from pytcp.lib.mac_address import MacAddress from pytcp.protocols.arp.fpa import ArpAssembler -from pytcp.protocols.arp.ps import ARP_HEADER_LEN, ARP_OP_REPLY, ARP_OP_REQUEST +from pytcp.protocols.arp.ps import ARP_HEADER_LEN, ArpOperation from pytcp.protocols.ether.ps import ETHER_TYPE_ARP @@ -61,13 +61,13 @@ def test_arp_fpa____init__(self) -> None: spa=Ip4Address("1.2.3.4"), tha=MacAddress("66:77:88:99:AA:BB"), tpa=Ip4Address("5.6.7.8"), - oper=ARP_OP_REPLY, + oper=ArpOperation.ARP_OP_REPLY, ) self.assertEqual(packet._sha, MacAddress("00:11:22:33:44:55")) self.assertEqual(packet._spa, Ip4Address("1.2.3.4")) self.assertEqual(packet._tha, MacAddress("66:77:88:99:AA:BB")) self.assertEqual(packet._tpa, Ip4Address("5.6.7.8")) - self.assertEqual(packet._oper, ARP_OP_REPLY) + self.assertEqual(packet._oper, ArpOperation.ARP_OP_REPLY) def test_arp_fpa____init____defaults(self) -> None: """ @@ -78,26 +78,26 @@ def test_arp_fpa____init____defaults(self) -> None: self.assertEqual(packet._spa, Ip4Address("0.0.0.0")) self.assertEqual(packet._tha, MacAddress("00:00:00:00:00:00")) self.assertEqual(packet._tpa, Ip4Address("0.0.0.0")) - self.assertEqual(packet._oper, ARP_OP_REQUEST) + self.assertEqual(packet._oper, ArpOperation.ARP_OP_REQUEST) def test_arp_fpa____init____assert_oper_request(self) -> None: """ Test assertion for the request operation. """ - ArpAssembler(oper=ARP_OP_REQUEST) + ArpAssembler(oper=ArpOperation.ARP_OP_REQUEST) def test_arp_fpa____init____assert_oper_reply(self) -> None: """ Test assertion for the request operation. """ - ArpAssembler(oper=ARP_OP_REPLY) + ArpAssembler(oper=ArpOperation.ARP_OP_REPLY) def test_arp_fpa____init____assert_oper_unknown(self) -> None: """ Test assertion for the unknown operation. """ with self.assertRaises(AssertionError): - ArpAssembler(oper=-1) + ArpAssembler(oper=-1) # type: ignore def test_arp_fpa____len__(self) -> None: """ @@ -115,7 +115,7 @@ def test_arp_fpa____str____request(self) -> None: spa=Ip4Address("1.2.3.4"), tha=MacAddress("66:77:88:99:AA:BB"), tpa=Ip4Address("5.6.7.8"), - oper=ARP_OP_REQUEST, + oper=ArpOperation.ARP_OP_REQUEST, ) self.assertEqual( str(packet), @@ -132,7 +132,7 @@ def test_arp_fpa____str____reply(self) -> None: spa=Ip4Address("1.2.3.4"), tha=MacAddress("66:77:88:99:AA:BB"), tpa=Ip4Address("5.6.7.8"), - oper=ARP_OP_REPLY, + oper=ArpOperation.ARP_OP_REPLY, ) self.assertEqual( str(packet), @@ -158,7 +158,7 @@ def test_ether_fpa__assemble(self) -> None: spa=Ip4Address("1.2.3.4"), tha=MacAddress("66:77:88:99:AA:BB"), tpa=Ip4Address("5.6.7.8"), - oper=ARP_OP_REPLY, + oper=ArpOperation.ARP_OP_REPLY, ) frame = memoryview(bytearray(len(packet))) packet.assemble(frame) diff --git a/tests/unit/protocols__arp__phtx.py b/tests/unit/protocols__arp__phtx.py index d2c7d487..b6bd3f7e 100755 --- a/tests/unit/protocols__arp__phtx.py +++ b/tests/unit/protocols__arp__phtx.py @@ -36,7 +36,7 @@ from pytcp.lib.packet_stats import PacketStatsTx from pytcp.lib.tx_status import TxStatus -from pytcp.protocols.arp.ps import ARP_OP_REPLY, ARP_OP_REQUEST +from pytcp.protocols.arp.ps import ArpOperation from pytcp.subsystems.packet_handler import PacketHandler from tests.unit.mock_network import ( MockNetworkSettings, @@ -72,7 +72,7 @@ def test_arp_phtx__arp_request(self) -> None: tx_status = self.packet_handler._phtx_arp( ether_src=self.mns.stack_mac_address, ether_dst=self.mns.mac_broadcast, - arp_oper=ARP_OP_REQUEST, + arp_oper=ArpOperation.ARP_OP_REQUEST, arp_sha=self.mns.stack_mac_address, arp_spa=self.mns.stack_ip4_host.address, arp_tha=self.mns.mac_unspecified, @@ -100,7 +100,7 @@ def test_arp_phtx__arp_reply(self) -> None: tx_status = self.packet_handler._phtx_arp( ether_src=self.mns.stack_mac_address, ether_dst=self.mns.host_a_mac_address, - arp_oper=ARP_OP_REPLY, + arp_oper=ArpOperation.ARP_OP_REPLY, arp_sha=self.mns.stack_mac_address, arp_spa=self.mns.stack_ip4_host.address, arp_tha=self.mns.host_a_mac_address,