From ad3147b7dccc65ead5239a32596a27840db97070 Mon Sep 17 00:00:00 2001 From: Sebastian Majewski Date: Mon, 15 Jul 2024 13:40:43 -0500 Subject: [PATCH] Refactor Arp and Ethernet parsers to use from_bytes method for header extraction --- pytcp/lib/ip4_address.py | 8 ++++++ pytcp/lib/mac_address.py | 9 ++++++ pytcp/protocols/arp/header.py | 43 ++++++++++++++-------------- pytcp/protocols/arp/parser.py | 2 +- pytcp/protocols/ethernet/header.py | 17 ++++++----- pytcp/protocols/ethernet/parser.py | 2 +- pytcp/protocols/icmp6/options__nd.py | 16 +++-------- pytcp/protocols/ip4/options.py | 18 ++---------- pytcp/protocols/tcp/options.py | 42 +++++---------------------- 9 files changed, 65 insertions(+), 92 deletions(-) diff --git a/pytcp/lib/ip4_address.py b/pytcp/lib/ip4_address.py index a76aed8d..23cae202 100755 --- a/pytcp/lib/ip4_address.py +++ b/pytcp/lib/ip4_address.py @@ -250,6 +250,14 @@ def multicast_mac(self) -> MacAddress: int(MacAddress(0x01005E000000)) | self._address & 0x7FFFFF ) + @staticmethod + def from_bytes(bytes: bytes) -> Ip4Address: + """ + Create IPv4 address object from bytes. + """ + + return Ip4Address(bytes) + class Ip4Mask(IpMask): """ diff --git a/pytcp/lib/mac_address.py b/pytcp/lib/mac_address.py index b01f2897..93b726b5 100755 --- a/pytcp/lib/mac_address.py +++ b/pytcp/lib/mac_address.py @@ -24,6 +24,7 @@ ############################################################################ # pylint: disable = missing-class-docstring +# pylint: disable = redefined-builtin """ Module contains Ethernet MAC address manipulation class. @@ -187,3 +188,11 @@ def is_broadcast(self) -> bool: """ return self._address == 281474976710655 + + @staticmethod + def from_bytes(bytes: bytes) -> MacAddress: + """ + Create MAC address object from bytes. + """ + + return MacAddress(bytes) diff --git a/pytcp/protocols/arp/header.py b/pytcp/protocols/arp/header.py index 19ba06a4..baa8dcb3 100644 --- a/pytcp/protocols/arp/header.py +++ b/pytcp/protocols/arp/header.py @@ -24,6 +24,7 @@ ############################################################################ # pylint: disable = too-many-instance-attributes +# pylint: disable = redefined-builtin """ Module contains header support classes for the ARP protccol. @@ -74,8 +75,8 @@ class ArpHardwareType(ProtoEnum): ETHERNET = 0x0001 @staticmethod - def _extract(frame: bytes) -> int: - return int.from_bytes(frame[0:2]) + def _from_bytes(bytes: bytes) -> int: + return int.from_bytes(bytes[:2]) class ArpProtocolType(ProtoEnum): @@ -86,8 +87,8 @@ class ArpProtocolType(ProtoEnum): IP4 = 0x0800 @staticmethod - def _extract(frame: bytes) -> int: - return int.from_bytes(frame[2:4]) + def _from_bytes(bytes: bytes) -> int: + return int.from_bytes(bytes[:2]) class ArpHardwareLength(ProtoEnum): @@ -98,8 +99,8 @@ class ArpHardwareLength(ProtoEnum): ETHERNET = 6 @staticmethod - def _extract(frame: bytes) -> int: - return int.from_bytes(frame[4:5]) + def _from_bytes(bytes: bytes) -> int: + return int.from_bytes(bytes[:1]) class ArpProtocolLength(ProtoEnum): @@ -110,8 +111,8 @@ class ArpProtocolLength(ProtoEnum): IP4 = 4 @staticmethod - def _extract(frame: bytes) -> int: - return int.from_bytes(frame[5:6]) + def _from_bytes(bytes: bytes) -> int: + return int.from_bytes(bytes[:1]) class ArpOperation(ProtoEnum): @@ -123,8 +124,8 @@ class ArpOperation(ProtoEnum): REPLY = 2 @staticmethod - def _extract(frame: bytes) -> int: - return int.from_bytes(frame[6:8]) + def _from_bytes(bytes: bytes) -> int: + return int.from_bytes(bytes[:2]) @dataclass @@ -169,21 +170,21 @@ def __bytes__(self) -> bytes: ) @staticmethod - def from_frame(frame: bytes) -> ArpHeader: + def from_bytes(bytes: bytes) -> ArpHeader: """ - Read the header from the frame. + Populate the header structure from bytes. """ return ArpHeader( - hrtype=ArpHardwareType.from_frame(frame), - prtype=ArpProtocolType.from_frame(frame), - hrlen=ArpHardwareLength.from_frame(frame), - prlen=ArpProtocolLength.from_frame(frame), - oper=ArpOperation.from_frame(frame), - sha=MacAddress(frame[8:14]), - spa=Ip4Address(frame[14:18]), - tha=MacAddress(frame[18:24]), - tpa=Ip4Address(frame[24:28]), + hrtype=ArpHardwareType.from_bytes(bytes[0:2]), + prtype=ArpProtocolType.from_bytes(bytes[2:4]), + hrlen=ArpHardwareLength.from_bytes(bytes[4:5]), + prlen=ArpProtocolLength.from_bytes(bytes[5:6]), + oper=ArpOperation.from_bytes(bytes[6:8]), + sha=MacAddress.from_bytes(bytes[8:14]), + spa=Ip4Address.from_bytes(bytes[14:18]), + tha=MacAddress.from_bytes(bytes[18:24]), + tpa=Ip4Address.from_bytes(bytes[24:28]), ) diff --git a/pytcp/protocols/arp/parser.py b/pytcp/protocols/arp/parser.py index 6a4b7df5..d823e81d 100755 --- a/pytcp/protocols/arp/parser.py +++ b/pytcp/protocols/arp/parser.py @@ -117,7 +117,7 @@ def _parse(self) -> None: Parse the packet. """ - self._header = ArpHeader.from_frame(self._frame) + self._header = ArpHeader.from_bytes(self._frame) @override def _validate_sanity(self) -> None: diff --git a/pytcp/protocols/ethernet/header.py b/pytcp/protocols/ethernet/header.py index af3294a8..9d96176c 100644 --- a/pytcp/protocols/ethernet/header.py +++ b/pytcp/protocols/ethernet/header.py @@ -24,6 +24,9 @@ ############################################################################ +# pylint: disable = redefined-builtin + + """ Module contains header support classes for the Ethernet protccol. @@ -68,8 +71,8 @@ class EthernetType(ProtoEnum): RAW = 0xFFFF @staticmethod - def _extract(frame: bytes) -> int: - return int(struct.unpack("! H", frame[12:14])[0]) + def _from_bytes(bytes: bytes) -> int: + return int.from_bytes(bytes[:2]) @dataclass @@ -102,15 +105,15 @@ def __bytes__(self) -> bytes: ) @staticmethod - def from_frame(frame: bytes) -> EthernetHeader: + def from_bytes(bytes: bytes) -> EthernetHeader: """ - Populate the header from the raw frame. + Populate the header structure from bytes. """ return EthernetHeader( - dst=MacAddress(frame[0:6]), - src=MacAddress(frame[6:12]), - type=EthernetType.from_frame(frame), + dst=MacAddress.from_bytes(bytes[0:6]), + src=MacAddress.from_bytes(bytes[6:12]), + type=EthernetType.from_bytes(bytes[12:14]), ) diff --git a/pytcp/protocols/ethernet/parser.py b/pytcp/protocols/ethernet/parser.py index d0ba9e68..b04f2668 100755 --- a/pytcp/protocols/ethernet/parser.py +++ b/pytcp/protocols/ethernet/parser.py @@ -111,7 +111,7 @@ def _parse(self) -> None: Parse the packet. """ - self._header = EthernetHeader.from_frame(self._frame) + self._header = EthernetHeader.from_bytes(self._frame) @override def _validate_sanity(self) -> None: diff --git a/pytcp/protocols/icmp6/options__nd.py b/pytcp/protocols/icmp6/options__nd.py index ee932db9..13835e92 100644 --- a/pytcp/protocols/icmp6/options__nd.py +++ b/pytcp/protocols/icmp6/options__nd.py @@ -231,21 +231,13 @@ def from_bytes(bytes: bytes) -> Icmp6NdOptions: while ptr < len(bytes): match Icmp6NdOptionType.from_bytes(bytes[ptr:]): case Icmp6NdOptionType.SLLA: - options.append( - Icmp6NdOptionSlla.from_bytes(bytes[ptr:]) - ) + options.append(Icmp6NdOptionSlla.from_bytes(bytes[ptr:])) case Icmp6NdOptionType.TLLA: - options.append( - Icmp6NdOptionTlla.from_bytes(bytes[ptr:]) - ) + options.append(Icmp6NdOptionTlla.from_bytes(bytes[ptr:])) case Icmp6NdOptionType.PI: - options.append( - Icmp6NdOptionPi.from_bytes(bytes[ptr:]) - ) + options.append(Icmp6NdOptionPi.from_bytes(bytes[ptr:])) case _: - options.append( - Icmp6NdOptionUnknown.from_bytes(bytes[ptr:]) - ) + options.append(Icmp6NdOptionUnknown.from_bytes(bytes[ptr:])) ptr += options[-1].len diff --git a/pytcp/protocols/ip4/options.py b/pytcp/protocols/ip4/options.py index 9e99c903..e5ad02c7 100644 --- a/pytcp/protocols/ip4/options.py +++ b/pytcp/protocols/ip4/options.py @@ -126,24 +126,12 @@ def from_bytes(bytes: bytes) -> Ip4Options: while ptr < len(bytes): match Ip4OptionType.from_bytes(bytes[ptr:]): case Ip4OptionType.EOL: - options.append( - Ip4OptionEol.from_bytes( - bytes[ptr:] - ) - ) + options.append(Ip4OptionEol.from_bytes(bytes[ptr:])) break case Ip4OptionType.NOP: - options.append( - Ip4OptionNop.from_bytes( - bytes[ptr:] - ) - ) + options.append(Ip4OptionNop.from_bytes(bytes[ptr:])) case _: - options.append( - Ip4OptionUnknown.from_bytes( - bytes[ptr:] - ) - ) + options.append(Ip4OptionUnknown.from_bytes(bytes[ptr:])) ptr += options[-1].len diff --git a/pytcp/protocols/tcp/options.py b/pytcp/protocols/tcp/options.py index a3c11a5a..115431fd 100644 --- a/pytcp/protocols/tcp/options.py +++ b/pytcp/protocols/tcp/options.py @@ -182,48 +182,20 @@ def from_bytes(bytes: bytes) -> TcpOptions: while ptr < len(bytes): match TcpOptionType.from_bytes(bytes[ptr:]): case TcpOptionType.EOL: - options.append( - TcpOptionEol.from_bytes( - bytes[ptr:] - ) - ) + options.append(TcpOptionEol.from_bytes(bytes[ptr:])) break case TcpOptionType.NOP: - options.append( - TcpOptionNop.from_bytes( - bytes[ptr:] - ) - ) + options.append(TcpOptionNop.from_bytes(bytes[ptr:])) case TcpOptionType.MSS: - options.append( - TcpOptionMss.from_bytes( - bytes[ptr:] - ) - ) + options.append(TcpOptionMss.from_bytes(bytes[ptr:])) case TcpOptionType.WSCALE: - options.append( - TcpOptionWscale.from_bytes( - bytes[ptr:] - ) - ) + options.append(TcpOptionWscale.from_bytes(bytes[ptr:])) case TcpOptionType.SACKPERM: - options.append( - TcpOptionSackPerm.from_bytes( - bytes[ptr:] - ) - ) + options.append(TcpOptionSackPerm.from_bytes(bytes[ptr:])) case TcpOptionType.TIMESTAMP: - options.append( - TcpOptionTimestamp.from_bytes( - bytes[ptr:] - ) - ) + options.append(TcpOptionTimestamp.from_bytes(bytes[ptr:])) case _: - options.append( - TcpOptionUnknown.from_bytes( - bytes[ptr:] - ) - ) + options.append(TcpOptionUnknown.from_bytes(bytes[ptr:])) ptr += options[-1].len