Skip to content

Commit

Permalink
Converted IPv4 to use header class
Browse files Browse the repository at this point in the history
  • Loading branch information
ccie18643 committed Jul 12, 2024
1 parent 110a97e commit e92e7ad
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 33 deletions.
4 changes: 2 additions & 2 deletions pytcp/protocols/ip4/phtx.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,8 @@ def __validate_src_ip4_address(
if (
ip4__src.is_unspecified
and isinstance(ip4__payload, UdpAssembler)
and ip4__payload._sport == 68
and ip4__payload._dport == 67
and ip4__payload._header.sport == 68
and ip4__payload._header.dport == 67
):
self.packet_stats_tx.ip4__src_unspecified__send += 1
__debug__ and log(
Expand Down
13 changes: 9 additions & 4 deletions pytcp/protocols/udp/fpa.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from pytcp.lib.ip_helper import inet_cksum
from pytcp.lib.proto import ProtoAssembler
from pytcp.lib.tracker import Tracker
from pytcp.protocols.udp.ps import UDP_HEADER_LEN, Udp
from pytcp.protocols.udp.ps import UDP_HEADER_LEN, Udp, UdpHeader


class UdpAssembler(Udp, ProtoAssembler):
Expand All @@ -65,13 +65,18 @@ def __init__(

self._tracker: Tracker = Tracker(prefix="TX", echo_tracker=echo_tracker)

self._sport = udp__sport
self._dport = udp__dport
self._data = b"" if udp__data is None else udp__data

self._header = UdpHeader(
sport=udp__sport,
dport=udp__dport,
plen=UDP_HEADER_LEN + len(self._data),
cksum=0,
)

self._plen = UDP_HEADER_LEN + len(self._data)
self._hlen = UDP_HEADER_LEN
self._dlen = len(self._data)
self._cksum = 0

def __len__(self) -> int:
"""
Expand Down
15 changes: 7 additions & 8 deletions pytcp/protocols/udp/fpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from pytcp.lib.errors import PacketIntegrityError, PacketSanityError
from pytcp.lib.ip_helper import inet_cksum
from pytcp.lib.proto import ProtoParser
from pytcp.protocols.udp.ps import UDP_HEADER_LEN, Udp
from pytcp.protocols.udp.ps import UDP_HEADER_LEN, Udp, UdpHeader

if TYPE_CHECKING:
from pytcp.lib.packet import PacketRx
Expand All @@ -54,7 +54,7 @@ class UdpIntegrityError(PacketIntegrityError):
Exception raised when UDP packet integrity check fails.
"""

def __init__(self, message: str):
def __init__(self, /, message: str):
super().__init__("[UDP] " + message)


Expand All @@ -63,7 +63,7 @@ class UdpSanityError(PacketSanityError):
Exception raised when UDP packet sanity check fails.
"""

def __init__(self, message: str):
def __init__(self, /, message: str):
super().__init__("[UDP] " + message)


Expand Down Expand Up @@ -121,9 +121,8 @@ def _parse(self) -> None:
Parse the packet.
"""

self._sport, self._dport, self._plen, self._cksum = struct.unpack(
"!HHHH", self._frame[:8]
)
self._header = UdpHeader.from_frame(self._frame)
self._plen = self._header.plen
self._data = self._frame[8 : self._plen]
self._hlen = UDP_HEADER_LEN
self._dlen = len(self._data)
Expand All @@ -133,12 +132,12 @@ def _validate_sanity(self) -> None:
Validate the packet sanity after parsing it.
"""

if self._sport == 0:
if self._header.sport == 0:
raise UdpSanityError(
"The 'udp_sport' must be greater than 0",
)

if self._dport == 0:
if self._header.dport == 0:
raise UdpSanityError(
"The 'udp_dport' must be greater then 0",
)
57 changes: 38 additions & 19 deletions pytcp/protocols/udp/ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@
pytcp/protocols/udp/ps.py
ver 2.8
ver 2.9
"""


from __future__ import annotations

import struct
from dataclasses import dataclass
from typing import override

from pytcp.lib.proto import Proto
Expand All @@ -51,6 +52,26 @@
# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+


@dataclass
class UdpHeader:
"""
Data class representing the UDP header.
"""

sport: int = 0
dport: int = 0
plen: int = 0
cksum: int = 0

@staticmethod
def from_frame(frame: bytes) -> UdpHeader:
"""
Populate the header from the raw frame.
"""

return UdpHeader(*struct.unpack("!HHHH", frame[:8]))


UDP_HEADER_LEN = 8


Expand All @@ -62,10 +83,10 @@ class Udp(Proto):
_ip6_next = Ip6Next.UDP
_ip4_proto = Ip4Proto.UDP

_sport: int
_dport: int
_cksum: int
_header: UdpHeader

_data: bytes

_plen: int
_hlen: int
_dlen: int
Expand All @@ -76,7 +97,7 @@ def __str__(self) -> str:
Get packet log string.
"""

return f"UDP {self._sport} > {self._dport}, plen {self._plen}"
return f"UDP {self._header.sport} > {self._header.dport}, plen {self._plen}"

@override
def __repr__(self) -> str:
Expand All @@ -86,12 +107,10 @@ def __repr__(self) -> str:

return (
f"Udp("
f"sport={self._sport!r},"
f"dport={self._dport!r},"
f"plen={self._plen!r},"
f"hlen={self._plen!r},"
f"dlen={self._plen!r},"
f"cksum={self._cksum!r})"
f"sport={self._header.sport!r},"
f"dport={self._header.dport!r},"
f"plen={self._header.plen!r},"
f"cksum={self._header.cksum!r})"
)

@override
Expand All @@ -103,10 +122,10 @@ def __bytes__(self) -> bytes:
return (
struct.pack(
"!HHHH",
self._sport,
self._dport,
self._plen,
self._cksum,
self._header.sport,
self._header.dport,
self._header.plen,
self._header.cksum,
)
+ self._data
)
Expand All @@ -133,31 +152,31 @@ def sport(self) -> int:
Get the '_sport' property.
"""

return self._sport
return self._header.sport

@property
def dport(self) -> int:
"""
Get the '_dport' property.
"""

return self._dport
return self._header.dport

@property
def plen(self) -> int:
"""
Get the '_plen' property.
"""

return self._plen
return self._header.plen

@property
def cksum(self) -> int:
"""
Get the '_cksum' property.
"""

return self._cksum
return self._header.cksum

@property
def data(self) -> bytes:
Expand Down

0 comments on commit e92e7ad

Please sign in to comment.