Skip to content

Commit

Permalink
Refactored ARP related code.
Browse files Browse the repository at this point in the history
  • Loading branch information
ccie18643 committed Nov 11, 2023
1 parent 7e106ef commit 07d5cac
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 154 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@

__pycache__/
venv/
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@
"python.analysis.extraPaths": [
"./pytcp"
],
"python.formatting.provider": "black"
"python.formatting.provider": "none",
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
}
}
41 changes: 25 additions & 16 deletions pytcp/protocols/arp/fpa.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from pytcp.lib.ip4_address import Ip4Address
from pytcp.lib.mac_address import MacAddress
from pytcp.lib.tracker import Tracker
from pytcp.protocols.arp.ps import ARP_HEADER_LEN, ARP_OP_REPLY, ARP_OP_REQUEST
from pytcp.protocols.arp.ps import ARP_HEADER_LEN, ArpOperation
from pytcp.protocols.ether.ps import ETHER_TYPE_ARP


Expand All @@ -59,22 +59,25 @@ def __init__(
spa: Ip4Address = Ip4Address(0),
tha: MacAddress = MacAddress(0),
tpa: Ip4Address = Ip4Address(0),
oper: int = ARP_OP_REQUEST,
oper: ArpOperation = ArpOperation.REQUEST,
echo_tracker: Tracker | None = None,
) -> None:
"""
Class constructor.
"""

assert oper in (ARP_OP_REQUEST, ARP_OP_REPLY), f"{oper=}"
assert oper in (
ArpOperation.REQUEST,
ArpOperation.REPLY,
), f"{oper=}"

self._tracker = Tracker(prefix="TX", echo_tracker=echo_tracker)

self._hrtype: int = 1
self._prtype: int = 0x0800
self._hrlen: int = 6
self._prlen: int = 4
self._oper: int = oper
self._oper: ArpOperation = oper
self._sha: MacAddress = sha
self._spa: Ip4Address = spa
self._tha: MacAddress = tha
Expand All @@ -84,35 +87,41 @@ def __len__(self) -> int:
"""
Length of the packet.
"""

return ARP_HEADER_LEN

def __str__(self) -> str:
"""
Packet log string.
"""
if self._oper == ARP_OP_REQUEST:
return (
f"ARP request {self._spa} / {self._sha}"
f" > {self._tpa} / {self._tha}"
)
if self._oper == ARP_OP_REPLY:
return (
f"ARP reply {self._spa} / {self._sha}"
f" > {self._tpa} / {self._tha}"
)
return f"ARP request unknown operation {self._oper}"

match self._oper:
case ArpOperation.REQUEST:
return (
f"ARP request {self._spa} / {self._sha}"
f" > {self._tpa} / {self._tha}"
)
case ArpOperation.REPLY:
return (
f"ARP reply {self._spa} / {self._sha}"
f" > {self._tpa} / {self._tha}"
)
case _:
return f"ARP request unknown operation {self._oper}"

@property
def tracker(self) -> Tracker:
"""
Getter for the '_tracker' property.
"""

return self._tracker

def assemble(self, frame: memoryview) -> None:
"""
Assemble packet into the raw form.
"""

struct.pack_into(
"!HH BBH 6s 4s 6s 4s",
frame,
Expand All @@ -121,7 +130,7 @@ def assemble(self, frame: memoryview) -> None:
self._prtype,
self._hrlen,
self._prlen,
self._oper,
self._oper.value,
bytes(self._sha),
bytes(self._spa),
bytes(self._tha),
Expand Down
88 changes: 61 additions & 27 deletions pytcp/protocols/arp/fpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,14 @@
from pytcp import config
from pytcp.lib.ip4_address import Ip4Address
from pytcp.lib.mac_address import MacAddress
from pytcp.protocols.arp.ps import ARP_HEADER_LEN, ARP_OP_REPLY, ARP_OP_REQUEST
from pytcp.protocols.arp.ps import (
ARP_HEADER_LEN,
ARP_HRLEN,
ARP_HRTYPE,
ARP_PRLEN,
ARP_PRTYPE,
ArpOperation,
)

if TYPE_CHECKING:
from pytcp.lib.packet import PacketRx
Expand Down Expand Up @@ -72,108 +79,132 @@ def __len__(self) -> int:
"""
Number of bytes remaining in the frame.
"""

return len(self._frame)

def __str__(self) -> str:
"""
Packet log string.
"""
if self.oper == ARP_OP_REQUEST:
return (
f"ARP request {self.spa} / {self.sha}"
f" > {self.tpa} / {self.tha}"
)
if self.oper == ARP_OP_REPLY:
return (
f"ARP reply {self.spa} / {self.sha}"
f" > {self.tpa} / {self.tha}"
)
return f"ARP request unknown operation {self.oper}"

match self.oper:
case ArpOperation.REQUEST:
return (
f"ARP request {self.spa} / {self.sha}"
f" > {self.tpa} / {self.tha}"
)
case ArpOperation.REPLY:
return (
f"ARP reply {self.spa} / {self.sha}"
f" > {self.tpa} / {self.tha}"
)
case _:
return f"ARP request unknown operation {self.oper}"

@property
def hrtype(self) -> int:
"""
Read the 'Hardware address type' field.
"""

if "_cache__hrtype" not in self.__dict__:
self._cache__hrtype: int = struct.unpack("!H", self._frame[0:2])[0]

return self._cache__hrtype

@property
def prtype(self) -> int:
"""
Read the 'Protocol address type' field.
"""

if "_cache__prtype" not in self.__dict__:
self._cache__prtype: int = struct.unpack("!H", self._frame[2:4])[0]

return self._cache__prtype

@property
def hrlen(self) -> int:
"""
Read the 'Hardware address length' field.
"""

return self._frame[4]

@property
def prlen(self) -> int:
"""
Read the 'Protocol address length' field.
"""

return self._frame[5]

@property
def oper(self) -> int:
def oper(self) -> ArpOperation:
"""
Read the 'Operation' field.
"""

if "_cache__oper" not in self.__dict__:
self._cache__oper: int = struct.unpack("!H", self._frame[6:8])[0]
self._cache__oper = ArpOperation(
struct.unpack("!H", self._frame[6:8])[0]
)

return self._cache__oper

@property
def sha(self) -> MacAddress:
"""
Read the 'Sender hardware address' field.
"""

if "_cache__sha" not in self.__dict__:
self._cache__sha = MacAddress(self._frame[8:14])

return self._cache__sha

@property
def spa(self) -> Ip4Address:
"""
Read the 'Sender protocol address' field.
"""

if "_cache__spa" not in self.__dict__:
self._cache__spa = Ip4Address(self._frame[14:18])

return self._cache__spa

@property
def tha(self) -> MacAddress:
"""
Read the 'Target hardware address' field.
"""

if "_cache__tha" not in self.__dict__:
self._cache__tha = MacAddress(self._frame[18:24])

return self._cache__tha

@property
def tpa(self) -> Ip4Address:
"""
Read the 'Target protocol address' field.
"""

if "_cache__tpa" not in self.__dict__:
self._cache__tpa = Ip4Address(self._frame[24:28])

return self._cache__tpa

@property
def packet_copy(self) -> bytes:
"""
Read the whole packet.
"""

if "_cache__packet_copy" not in self.__dict__:
self._cache__packet_copy = bytes(self._frame[:ARP_HEADER_LEN])

return self._cache__packet_copy

def _packet_integrity_check(self) -> str:
Expand All @@ -186,32 +217,35 @@ def _packet_integrity_check(self) -> str:
return ""

if len(self) < ARP_HEADER_LEN:
return "ARP integrity - wrong packet length (I)"
return f"ARP integrity - wrong packet length, {len(self)} bytes."

return ""

def _packet_sanity_check(self) -> str:
"""
Packet sanity check to be run on parsed packet to make sure packet's
fields contain sane values
Packet sanity check to be run on parsed packet to make sure
packet's fields contain sane values
"""

if not config.PACKET_SANITY_CHECK:
return ""

if self.hrtype != 1:
return "ARP sanity - 'arp_hrtype' must be 1"
if self.hrtype != ARP_HRTYPE:
return f"ARP sanity - 'arp_hrtype' must be {ARP_HRTYPE}"

if self.prtype != 0x0800:
return "ARP sanity - 'arp_prtype' must be 0x0800"
if self.prtype != ARP_PRTYPE:
return f"ARP sanity - 'arp_prtype' must be {ARP_PRTYPE}"

if self.hrlen != 6:
return "ARP sanity - 'arp_hrlen' must be 6"
if self.hrlen != ARP_HRLEN:
return f"ARP sanity - 'arp_hrlen' must be {ARP_HRLEN}"

if self.prlen != 4:
return "ARP sanity - 'arp_prlen' must be 4"
if self.prlen != ARP_PRLEN:
return f"ARP sanity - 'arp_prlen' must be {ARP_PRLEN}"

if self.oper not in {1, 2}:
return "ARP sanity - 'oper' must be [1-2]"
if self.oper not in ArpOperation:
return (
f"ARP sanity - 'oper' must be "
f"{[oper.value for oper in ArpOperation]}"
)

return ""
Loading

0 comments on commit 07d5cac

Please sign in to comment.