From e92e7ad7ab07d858fa90adfc84d45d7b0a91c0a6 Mon Sep 17 00:00:00 2001 From: Sebastian Majewski Date: Thu, 11 Jul 2024 19:07:04 -0500 Subject: [PATCH] Converted IPv4 to use header class --- pytcp/protocols/ip4/phtx.py | 4 +-- pytcp/protocols/udp/fpa.py | 13 ++++++--- pytcp/protocols/udp/fpp.py | 15 +++++----- pytcp/protocols/udp/ps.py | 57 ++++++++++++++++++++++++------------- 4 files changed, 56 insertions(+), 33 deletions(-) diff --git a/pytcp/protocols/ip4/phtx.py b/pytcp/protocols/ip4/phtx.py index 910bcf33..9d95475e 100755 --- a/pytcp/protocols/ip4/phtx.py +++ b/pytcp/protocols/ip4/phtx.py @@ -338,8 +338,8 @@ def __validate_src_ip4_address( if ( ip4__src.is_unspecified and isinstance(ip4__payload, UdpAssembler) - and ip4__payload._sport == 68 - and ip4__payload._dport == 67 + and ip4__payload._header.sport == 68 + and ip4__payload._header.dport == 67 ): self.packet_stats_tx.ip4__src_unspecified__send += 1 __debug__ and log( diff --git a/pytcp/protocols/udp/fpa.py b/pytcp/protocols/udp/fpa.py index 5f2d99f0..1768e067 100755 --- a/pytcp/protocols/udp/fpa.py +++ b/pytcp/protocols/udp/fpa.py @@ -40,7 +40,7 @@ from pytcp.lib.ip_helper import inet_cksum from pytcp.lib.proto import ProtoAssembler from pytcp.lib.tracker import Tracker -from pytcp.protocols.udp.ps import UDP_HEADER_LEN, Udp +from pytcp.protocols.udp.ps import UDP_HEADER_LEN, Udp, UdpHeader class UdpAssembler(Udp, ProtoAssembler): @@ -65,13 +65,18 @@ def __init__( self._tracker: Tracker = Tracker(prefix="TX", echo_tracker=echo_tracker) - self._sport = udp__sport - self._dport = udp__dport self._data = b"" if udp__data is None else udp__data + + self._header = UdpHeader( + sport=udp__sport, + dport=udp__dport, + plen=UDP_HEADER_LEN + len(self._data), + cksum=0, + ) + self._plen = UDP_HEADER_LEN + len(self._data) self._hlen = UDP_HEADER_LEN self._dlen = len(self._data) - self._cksum = 0 def __len__(self) -> int: """ diff --git a/pytcp/protocols/udp/fpp.py b/pytcp/protocols/udp/fpp.py index 2f30f74b..a4a1b04a 100755 --- a/pytcp/protocols/udp/fpp.py +++ b/pytcp/protocols/udp/fpp.py @@ -43,7 +43,7 @@ from pytcp.lib.errors import PacketIntegrityError, PacketSanityError from pytcp.lib.ip_helper import inet_cksum from pytcp.lib.proto import ProtoParser -from pytcp.protocols.udp.ps import UDP_HEADER_LEN, Udp +from pytcp.protocols.udp.ps import UDP_HEADER_LEN, Udp, UdpHeader if TYPE_CHECKING: from pytcp.lib.packet import PacketRx @@ -54,7 +54,7 @@ class UdpIntegrityError(PacketIntegrityError): Exception raised when UDP packet integrity check fails. """ - def __init__(self, message: str): + def __init__(self, /, message: str): super().__init__("[UDP] " + message) @@ -63,7 +63,7 @@ class UdpSanityError(PacketSanityError): Exception raised when UDP packet sanity check fails. """ - def __init__(self, message: str): + def __init__(self, /, message: str): super().__init__("[UDP] " + message) @@ -121,9 +121,8 @@ def _parse(self) -> None: Parse the packet. """ - self._sport, self._dport, self._plen, self._cksum = struct.unpack( - "!HHHH", self._frame[:8] - ) + self._header = UdpHeader.from_frame(self._frame) + self._plen = self._header.plen self._data = self._frame[8 : self._plen] self._hlen = UDP_HEADER_LEN self._dlen = len(self._data) @@ -133,12 +132,12 @@ def _validate_sanity(self) -> None: Validate the packet sanity after parsing it. """ - if self._sport == 0: + if self._header.sport == 0: raise UdpSanityError( "The 'udp_sport' must be greater than 0", ) - if self._dport == 0: + if self._header.dport == 0: raise UdpSanityError( "The 'udp_dport' must be greater then 0", ) diff --git a/pytcp/protocols/udp/ps.py b/pytcp/protocols/udp/ps.py index b4673cdd..456bb14f 100755 --- a/pytcp/protocols/udp/ps.py +++ b/pytcp/protocols/udp/ps.py @@ -29,13 +29,14 @@ pytcp/protocols/udp/ps.py -ver 2.8 +ver 2.9 """ from __future__ import annotations import struct +from dataclasses import dataclass from typing import override from pytcp.lib.proto import Proto @@ -51,6 +52,26 @@ # +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +@dataclass +class UdpHeader: + """ + Data class representing the UDP header. + """ + + sport: int = 0 + dport: int = 0 + plen: int = 0 + cksum: int = 0 + + @staticmethod + def from_frame(frame: bytes) -> UdpHeader: + """ + Populate the header from the raw frame. + """ + + return UdpHeader(*struct.unpack("!HHHH", frame[:8])) + + UDP_HEADER_LEN = 8 @@ -62,10 +83,10 @@ class Udp(Proto): _ip6_next = Ip6Next.UDP _ip4_proto = Ip4Proto.UDP - _sport: int - _dport: int - _cksum: int + _header: UdpHeader + _data: bytes + _plen: int _hlen: int _dlen: int @@ -76,7 +97,7 @@ def __str__(self) -> str: Get packet log string. """ - return f"UDP {self._sport} > {self._dport}, plen {self._plen}" + return f"UDP {self._header.sport} > {self._header.dport}, plen {self._plen}" @override def __repr__(self) -> str: @@ -86,12 +107,10 @@ def __repr__(self) -> str: return ( f"Udp(" - f"sport={self._sport!r}," - f"dport={self._dport!r}," - f"plen={self._plen!r}," - f"hlen={self._plen!r}," - f"dlen={self._plen!r}," - f"cksum={self._cksum!r})" + f"sport={self._header.sport!r}," + f"dport={self._header.dport!r}," + f"plen={self._header.plen!r}," + f"cksum={self._header.cksum!r})" ) @override @@ -103,10 +122,10 @@ def __bytes__(self) -> bytes: return ( struct.pack( "!HHHH", - self._sport, - self._dport, - self._plen, - self._cksum, + self._header.sport, + self._header.dport, + self._header.plen, + self._header.cksum, ) + self._data ) @@ -133,7 +152,7 @@ def sport(self) -> int: Get the '_sport' property. """ - return self._sport + return self._header.sport @property def dport(self) -> int: @@ -141,7 +160,7 @@ def dport(self) -> int: Get the '_dport' property. """ - return self._dport + return self._header.dport @property def plen(self) -> int: @@ -149,7 +168,7 @@ def plen(self) -> int: Get the '_plen' property. """ - return self._plen + return self._header.plen @property def cksum(self) -> int: @@ -157,7 +176,7 @@ def cksum(self) -> int: Get the '_cksum' property. """ - return self._cksum + return self._header.cksum @property def data(self) -> bytes: