Skip to content

Commit

Permalink
Refactor ProtoEnum to use from_bytes method for enum value extraction.
Browse files Browse the repository at this point in the history
  • Loading branch information
ccie18643 committed Jul 15, 2024
1 parent 454ff58 commit c9b68bd
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 46 deletions.
27 changes: 26 additions & 1 deletion pytcp/lib/enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@


"""
Module contains the StaticEnum and ExtendableEnum classes.
Module contains the ProtoEnum class.
pytcp/lib/enum.py
Expand Down Expand Up @@ -96,6 +96,16 @@ def _extract(

raise NotImplementedError

@staticmethod
def _from_bytes(
bytes: bytes,
) -> int:
"""
Extract the enum value from the provided bytes.
"""

raise NotImplementedError

@classmethod
def from_frame(
cls,
Expand All @@ -111,6 +121,21 @@ def from_frame(

return cls(value)

@classmethod
def from_bytes(
cls,
/,
bytes: bytes,
) -> Self:
"""
Create the enum object from the provided bytes.
"""

if (value := cls._from_bytes(bytes)) not in cls:
extend_enum(cls, f"UNKNOWN_{value}", value)

return cls(value)

@classmethod
def get_core_values(cls) -> list[int]:
"""
Expand Down
24 changes: 12 additions & 12 deletions pytcp/protocols/icmp6/options__nd.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ class Icmp6NdOptionType(ProtoEnum):
PI = 3

@staticmethod
def _extract(frame: bytes) -> int:
return int(frame[0])
def _from_bytes(bytes: bytes) -> int:
return int.from_bytes(bytes[0:1])


ICMP6_ND_OPT_LEN__SLLA = 8
Expand Down Expand Up @@ -220,34 +220,34 @@ def pi(self) -> list[NdPrefixInfo]:
return prefix_info_list

@staticmethod
def from_bytes(frame: bytes) -> Icmp6NdOptions:
def from_bytes(bytes: bytes) -> Icmp6NdOptions:
"""
Read the ICMPv6 ND options from bytes.
"""

option__ptr = 0

ptr = 0
options: list[Icmp6NdOption] = []
while option__ptr < len(frame):
match Icmp6NdOptionType.from_frame(frame[option__ptr:]):

while ptr < len(bytes):
match Icmp6NdOptionType.from_bytes(bytes[ptr:]):
case Icmp6NdOptionType.SLLA:
options.append(
Icmp6NdOptionSlla.from_bytes(frame[option__ptr:])
Icmp6NdOptionSlla.from_bytes(bytes[ptr:])
)
case Icmp6NdOptionType.TLLA:
options.append(
Icmp6NdOptionTlla.from_bytes(frame[option__ptr:])
Icmp6NdOptionTlla.from_bytes(bytes[ptr:])
)
case Icmp6NdOptionType.PI:
options.append(
Icmp6NdOptionPi.from_bytes(frame[option__ptr:])
Icmp6NdOptionPi.from_bytes(bytes[ptr:])
)
case _:
options.append(
Icmp6NdOptionUnknown.from_bytes(frame[option__ptr:])
Icmp6NdOptionUnknown.from_bytes(bytes[ptr:])
)

option__ptr += options[-1].len
ptr += options[-1].len

return Icmp6NdOptions(*options)

Expand Down
35 changes: 22 additions & 13 deletions pytcp/protocols/ip4/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

from pytcp.lib.enum import ProtoEnum
from pytcp.lib.proto import Proto
from pytcp.protocols.ip4.header import IP4_HEADER_LEN


class Ip4OptionType(ProtoEnum):
Expand All @@ -56,9 +55,10 @@ class Ip4OptionType(ProtoEnum):
EOL = 0
NOP = 1

@override
@staticmethod
def _extract(frame: bytes) -> int:
return frame[0]
def _from_bytes(bytes: bytes) -> int:
return int.from_bytes(bytes[0:1])


IP4_OPTION_LEN__EOL = 1
Expand Down Expand Up @@ -115,28 +115,37 @@ def __bool__(self) -> bool:
return bool(self._options)

@staticmethod
def from_frame(frame: bytes) -> Ip4Options:
def from_bytes(bytes: bytes) -> Ip4Options:
"""
Read the IPv4 options from frame.
"""

option__ptr = IP4_HEADER_LEN
ip4__hlen = (frame[0] & 0x0F) << 2

ptr = 0
options: list[Ip4Option] = []
while option__ptr < ip4__hlen:
match Ip4OptionType.from_frame(frame[option__ptr:]):

while ptr < len(bytes):
match Ip4OptionType.from_bytes(bytes[ptr:]):
case Ip4OptionType.EOL:
options.append(Ip4OptionEol.from_bytes(frame[option__ptr:]))
options.append(
Ip4OptionEol.from_bytes(
bytes[ptr:]
)
)
break
case Ip4OptionType.NOP:
options.append(Ip4OptionNop.from_bytes(frame[option__ptr:]))
options.append(
Ip4OptionNop.from_bytes(
bytes[ptr:]
)
)
case _:
options.append(
Ip4OptionUnknown.from_bytes(frame[option__ptr:])
Ip4OptionUnknown.from_bytes(
bytes[ptr:]
)
)

option__ptr += options[-1].len
ptr += options[-1].len

return Ip4Options(*options)

Expand Down
4 changes: 3 additions & 1 deletion pytcp/protocols/ip4/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ def _parse(self) -> None:
"""

self._header = Ip4Header.from_frame(self._frame)
self._options = Ip4Options.from_frame(self._frame)
self._options = Ip4Options.from_bytes(
self._frame[IP4_HEADER_LEN : self._header.hlen]
)

self._hlen = self._header.hlen
self._plen = self._header.plen
Expand Down
52 changes: 35 additions & 17 deletions pytcp/protocols/tcp/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@

from pytcp.lib.enum import ProtoEnum
from pytcp.lib.proto import Proto
from pytcp.protocols.tcp.header import TCP_HEADER_LEN

TCP_DEFAULT_MSS = 536

Expand All @@ -62,8 +61,8 @@ class TcpOptionType(ProtoEnum):
TIMESTAMP = 8

@staticmethod
def _extract(frame: bytes) -> int:
return frame[0]
def _from_bytes(bytes: bytes) -> int:
return int.from_bytes(bytes[0:1])


TCP_OPTION_LEN__EOL = 1
Expand Down Expand Up @@ -172,42 +171,61 @@ def timestamp(self) -> tuple[int, int] | None:
return None

@staticmethod
def from_frame(frame: bytes) -> TcpOptions:
def from_bytes(bytes: bytes) -> TcpOptions:
"""
Read the TCP options from frame.
"""

option__ptr = TCP_HEADER_LEN
tcp__hlen = (frame[12] >> 4) << 2

ptr = 0
options: list[TcpOption] = []
while option__ptr < tcp__hlen:
match TcpOptionType.from_frame(frame[option__ptr:]):

while ptr < len(bytes):
match TcpOptionType.from_bytes(bytes[ptr:]):
case TcpOptionType.EOL:
options.append(TcpOptionEol.from_bytes(frame[option__ptr:]))
options.append(
TcpOptionEol.from_bytes(
bytes[ptr:]
)
)
break
case TcpOptionType.NOP:
options.append(TcpOptionNop.from_bytes(frame[option__ptr:]))
options.append(
TcpOptionNop.from_bytes(
bytes[ptr:]
)
)
case TcpOptionType.MSS:
options.append(TcpOptionMss.from_bytes(frame[option__ptr:]))
options.append(
TcpOptionMss.from_bytes(
bytes[ptr:]
)
)
case TcpOptionType.WSCALE:
options.append(
TcpOptionWscale.from_bytes(frame[option__ptr:])
TcpOptionWscale.from_bytes(
bytes[ptr:]
)
)
case TcpOptionType.SACKPERM:
options.append(
TcpOptionSackPerm.from_bytes(frame[option__ptr:])
TcpOptionSackPerm.from_bytes(
bytes[ptr:]
)
)
case TcpOptionType.TIMESTAMP:
options.append(
TcpOptionTimestamp.from_bytes(frame[option__ptr:])
TcpOptionTimestamp.from_bytes(
bytes[ptr:]
)
)
case _:
options.append(
TcpOptionUnknown.from_bytes(frame[option__ptr:])
TcpOptionUnknown.from_bytes(
bytes[ptr:]
)
)

option__ptr += options[-1].len
ptr += options[-1].len

return TcpOptions(*options)

Expand Down
5 changes: 3 additions & 2 deletions pytcp/protocols/tcp/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,15 @@ def _parse(self) -> None:
"""

self._header = TcpHeader.from_frame(self._frame)
self._options = TcpOptions.from_bytes(
self._frame[TCP_HEADER_LEN : self._header.hlen]
)

self._plen = self._ip__dlen
self._hlen = self._header.hlen
self._olen = self._hlen - TCP_HEADER_LEN
self._dlen = self._plen - self._hlen

self._options = TcpOptions.from_frame(frame=self._frame)

self._data = self._frame[self._hlen : self._plen]

@override
Expand Down

0 comments on commit c9b68bd

Please sign in to comment.