diff --git a/pytcp/protocols/icmp6/assembler.py b/pytcp/protocols/icmp6/assembler.py index f32499fb..74a6a6bb 100755 --- a/pytcp/protocols/icmp6/assembler.py +++ b/pytcp/protocols/icmp6/assembler.py @@ -25,6 +25,7 @@ # pylint: disable = too-many-instance-attributes # pylint: disable = too-many-locals +# pylint: disable = too-few-public-methods # pylint: disable = too-many-return-statements # pylint: disable = too-many-arguments # pylint: disable = redefined-builtin @@ -41,9 +42,8 @@ from __future__ import annotations import struct -from typing import TYPE_CHECKING -from pytcp.lib.ip6_address import Ip6Address, Ip6Network +from pytcp.lib.ip6_address import Ip6Address from pytcp.lib.ip_helper import inet_cksum from pytcp.lib.proto import ProtoAssembler from pytcp.lib.tracker import Tracker @@ -79,17 +79,9 @@ from pytcp.protocols.icmp6.message__nd__router_solicitation import ( Icmp6NdRouterSolicitationMessage, ) -from pytcp.protocols.icmp6.options__nd import ( - Icmp6NdOption, - Icmp6NdOptionPi, - Icmp6NdOptionSlla, - Icmp6NdOptionTlla, -) +from pytcp.protocols.icmp6.options__nd import Icmp6NdOptions from pytcp.protocols.ip6.header import IP6_HEADER_LEN -if TYPE_CHECKING: - from pytcp.lib.mac_address import MacAddress - class Icmp6Assembler(Icmp6, ProtoAssembler): """ @@ -216,7 +208,7 @@ class Icmp6NdRouterSolicitationMessageAssembler( Assembler class for the ICMPv6 ND Router Soliciation message. """ - def __init__(self, *, nd_options: list[Icmp6NdOption]) -> None: + def __init__(self, *, nd_options: Icmp6NdOptions) -> None: """ Create the message object. """ @@ -241,7 +233,7 @@ def __init__( router_lifetime: int, reachable_time: int, retrans_timer: int, - nd_options: list[Icmp6NdOption], + nd_options: Icmp6NdOptions, ) -> None: """ Create the message object. @@ -269,7 +261,7 @@ class Icmp6NdNeighborSolicitationMessageAssembler( """ def __init__( - self, *, target_address: Ip6Address, nd_options: list[Icmp6NdOption] + self, *, target_address: Ip6Address, nd_options: Icmp6NdOptions ) -> None: """ Create the message object. @@ -294,7 +286,7 @@ def __init__( flag_s: bool = False, flag_o: bool = False, target_address: Ip6Address, - nd_options: list[Icmp6NdOption], + nd_options: Icmp6NdOptions, ) -> None: """ Create the message object. @@ -323,67 +315,6 @@ def __init__(self, *, records: list[Icmp6Mld2AddressRecord]) -> None: self._records = records -# -# ICMPv6 Neighbor Discovery options -# - - -class Icmp6NdOptSllaAssembler(Icmp6NdOptionSlla): - """ - ICMPv6 ND option assembler - Source Link Layer Address (1). - """ - - def __init__(self, *, slla: MacAddress) -> None: - """ - Create the option object. - """ - - self._slla = slla - - -class Icmp6NdOptTllaAssembler(Icmp6NdOptionTlla): - """ - ICMPv6 ND option - Target Link Layer Address (2). - """ - - def __init__(self, *, tlla: MacAddress) -> None: - """ - Create the option object. - """ - - self._tlla = tlla - - -class Icmp6NdOptPiAssembler(Icmp6NdOptionPi): - """ - ICMPv6 ND option assembler - Prefix Information (3). - """ - - def __init__( - self, - *, - flag_l: bool = False, - flag_a: bool = False, - flag_r: bool = False, - valid_lifetime: int, - prefer_lifetime: int, - prefix: Ip6Network, - ) -> None: - """ - Option constructor. - """ - - assert 0 <= valid_lifetime <= 0xFFFFFFFF - assert 0 <= prefer_lifetime <= 0xFFFFFFFF - - self._flag_l = flag_l - self._flag_a = flag_a - self._flag_r = flag_r - self._valid_lifetime = valid_lifetime - self._prefer_lifetime = prefer_lifetime - self._prefix = prefix - - # # ICMPv6 MLD2 Multicast support classes # diff --git a/pytcp/protocols/icmp6/message__nd.py b/pytcp/protocols/icmp6/message__nd.py index e3add27d..1090bac2 100644 --- a/pytcp/protocols/icmp6/message__nd.py +++ b/pytcp/protocols/icmp6/message__nd.py @@ -38,15 +38,9 @@ class for the ICMPv6 protocol. from __future__ import annotations -from pytcp.lib.ip6_address import Ip6Network from pytcp.lib.mac_address import MacAddress from pytcp.protocols.icmp6.message import Icmp6Message, Icmp6Type -from pytcp.protocols.icmp6.options__nd import ( - Icmp6NdOption, - Icmp6NdOptionPi, - Icmp6NdOptionSlla, - Icmp6NdOptionTlla, -) +from pytcp.protocols.icmp6.options__nd import Icmp6NdOptions, NdPrefixInfo class Icmp6NdMessage(Icmp6Message): @@ -55,20 +49,12 @@ class Icmp6NdMessage(Icmp6Message): """ _type: Icmp6Type - _nd_options: list[Icmp6NdOption] + _nd_options: Icmp6NdOptions @property - def _raw_nd_options(self) -> bytes: + def nd_options(self) -> Icmp6NdOptions: """ - Get the ND options in raw format. - """ - - return b"".join(bytes(nd_option) for nd_option in self._nd_options) - - @property - def nd_options(self) -> list[Icmp6NdOption]: - """ - Get the '_nd_options' property. + Get the '_nd_options' attribute. """ return self._nd_options @@ -79,14 +65,7 @@ def option_slla(self) -> MacAddress | None: Get the Source Link Layer Address option. """ - # TODO: Currently we are returning the first option, - # should we return list of matching options instead ? - - for option in self._nd_options: - if isinstance(option, Icmp6NdOptionSlla): - return option.slla - - return None + return self._nd_options.slla @property def option_tlla(self) -> MacAddress | None: @@ -94,28 +73,12 @@ def option_tlla(self) -> MacAddress | None: Get the Target Link Layer Address option. """ - # TODO: Currently we are returning the first option, - # should we return list of matching options instead ? - - for option in self._nd_options: - if isinstance(option, Icmp6NdOptionTlla): - return option.tlla - - return None + return self._nd_options.tlla @property - def option_pi(self) -> list[Ip6Network]: + def option_pi(self) -> list[NdPrefixInfo]: """ Get the Prefix Information option. """ - # TODO: Currently we are returning the first option, - # should we return list of matching options instead ? - - prefixes = [] - - for option in self._nd_options: - if isinstance(option, Icmp6NdOptionPi): - prefixes.append(option.prefix) - - return prefixes + return self._nd_options.pi diff --git a/pytcp/protocols/icmp6/message__nd__neighbor_advertisement.py b/pytcp/protocols/icmp6/message__nd__neighbor_advertisement.py index 45d2febb..0ba359e9 100644 --- a/pytcp/protocols/icmp6/message__nd__neighbor_advertisement.py +++ b/pytcp/protocols/icmp6/message__nd__neighbor_advertisement.py @@ -42,7 +42,7 @@ class for the ICMPv6 protocol. from pytcp.lib.ip6_address import Ip6Address from pytcp.protocols.icmp6.message import Icmp6Code, Icmp6Type from pytcp.protocols.icmp6.message__nd import Icmp6NdMessage -from pytcp.protocols.icmp6.options__nd import Icmp6NdOption +from pytcp.protocols.icmp6.options__nd import Icmp6NdOptions # 'ND - Neighbor Advertisement' message (136/0) @@ -86,7 +86,7 @@ class Icmp6NdNeighborAdvertisementMessage(Icmp6NdMessage): _flag_s: bool _flag_o: bool _target_address: Ip6Address - _nd_options: list[Icmp6NdOption] + _nd_options: Icmp6NdOptions @override def __len__(self) -> int: @@ -94,8 +94,8 @@ def __len__(self) -> int: Get the message length. """ - return ICMP6_MESSAGE_LEN__ND_NEIGHBOR_ADVERTISEMENT + sum( - len(option) for option in self._nd_options + return ICMP6_MESSAGE_LEN__ND_NEIGHBOR_ADVERTISEMENT + len( + self._nd_options ) @override @@ -111,7 +111,7 @@ def __str__(self) -> str: f"{'S' if self._flag_s else '-'}" f"{'O' if self._flag_o else '-'}, " f"target {self._target_address}, " - f"{', '.join(str(nd_option) for nd_option in self._nd_options)}" + f"opts [{self._nd_options}]" ) @override @@ -126,7 +126,7 @@ def __repr__(self) -> str: f"flag_s={self._flag_s!r}, " f"flag_o={self._flag_o!r}, " f"target={self._target_address!r}, " - f"{', '.join(f"{nd_option!r}" for nd_option in self._nd_options)})" + f"options={self._nd_options!r})" ) @override @@ -136,7 +136,7 @@ def __bytes__(self) -> bytes: """ return struct.pack( - f"! BBH L 16s {len(self._raw_nd_options)}s", + f"! BBH L 16s {len(self._nd_options)}s", int(self._type), int(self._code), 0, @@ -145,7 +145,7 @@ def __bytes__(self) -> bytes: | (self._flag_o << 29) | (self._reserved & 0b00011111_11111111_11111111_11111111), bytes(self._target_address), - self._raw_nd_options, + bytes(self._nd_options), ) @property diff --git a/pytcp/protocols/icmp6/message__nd__neighbor_solicitation.py b/pytcp/protocols/icmp6/message__nd__neighbor_solicitation.py index 857807a5..80453bee 100644 --- a/pytcp/protocols/icmp6/message__nd__neighbor_solicitation.py +++ b/pytcp/protocols/icmp6/message__nd__neighbor_solicitation.py @@ -42,7 +42,7 @@ class for the ICMPv6 protocol. from pytcp.lib.ip6_address import Ip6Address from pytcp.protocols.icmp6.message import Icmp6Code, Icmp6Type from pytcp.protocols.icmp6.message__nd import Icmp6NdMessage -from pytcp.protocols.icmp6.options__nd import Icmp6NdOption +from pytcp.protocols.icmp6.options__nd import Icmp6NdOptions # 'Neighbor Discovery - Neighbor Solicitation' message (135/0) @@ -83,7 +83,7 @@ class Icmp6NdNeighborSolicitationMessage(Icmp6NdMessage): _reserved: int _target_address: Ip6Address - _nd_options: list[Icmp6NdOption] + _nd_options: Icmp6NdOptions @override def __len__(self) -> int: @@ -91,8 +91,8 @@ def __len__(self) -> int: Get the message length. """ - return ICMP6_MESSAGE_LEN__ND_NEIGHBOR_SOLICITATION + sum( - len(option) for option in self._nd_options + return ICMP6_MESSAGE_LEN__ND_NEIGHBOR_SOLICITATION + len( + self._nd_options ) @override @@ -104,7 +104,7 @@ def __str__(self) -> str: return ( f"ICMP6 ND Neighbor Solicitation, " f"target {self._target_address}, " - f"{', '.join(str(nd_option) for nd_option in self._nd_options)}" + f"opts [{self._nd_options}]" ) @override @@ -115,8 +115,8 @@ def __repr__(self) -> str: return ( "Icmp6NdNeighborSolicitationMessage(" - f"target {self._target_address!r}, " - f"{', '.join(f"{nd_option!r}" for nd_option in self._nd_options)})" + f"target={self._target_address!r}, " + f"options={self._nd_options!r})" ) @override @@ -126,13 +126,13 @@ def __bytes__(self) -> bytes: """ return struct.pack( - f"! BBH L 16s {len(self._raw_nd_options)}s", + f"! BBH L 16s {len(self._nd_options)}s", int(self._type), int(self._code), 0, self._reserved, bytes(self._target_address), - self._raw_nd_options, + bytes(self._nd_options), ) @property diff --git a/pytcp/protocols/icmp6/message__nd__router_advertisement.py b/pytcp/protocols/icmp6/message__nd__router_advertisement.py index 09a072d0..c03ae4d0 100644 --- a/pytcp/protocols/icmp6/message__nd__router_advertisement.py +++ b/pytcp/protocols/icmp6/message__nd__router_advertisement.py @@ -41,7 +41,7 @@ class for the ICMPv6 protocol. from pytcp.protocols.icmp6.message import Icmp6Code, Icmp6Type from pytcp.protocols.icmp6.message__nd import Icmp6NdMessage -from pytcp.protocols.icmp6.options__nd import Icmp6NdOption +from pytcp.protocols.icmp6.options__nd import Icmp6NdOptions # 'Neighbor Discovery - Router Advertisement' message (134/0) @@ -82,7 +82,7 @@ class Icmp6NdRouterAdvertisementMessage(Icmp6NdMessage): _router_lifetime: int _reachable_time: int _retrans_timer: int - _nd_options: list[Icmp6NdOption] + _nd_options: Icmp6NdOptions @override def __len__(self) -> int: @@ -90,8 +90,8 @@ def __len__(self) -> int: Get the message length. """ - return ICMP6_MESSAGE_LEN__ND_ROUTER_ADVERTISEMENT + sum( - len(option) for option in self._nd_options + return ICMP6_MESSAGE_LEN__ND_ROUTER_ADVERTISEMENT + len( + self._nd_options ) @override @@ -109,7 +109,7 @@ def __str__(self) -> str: f"rlft {self._router_lifetime}, " f"reacht {self._reachable_time}, " f"retrt {self._retrans_timer}, " - f"{', '.join(str(nd_option) for nd_option in self._nd_options)}" + f"opts [{self._nd_options}]" ) @override @@ -126,7 +126,7 @@ def __repr__(self) -> str: f"rlft={self._router_lifetime!r}, " f"reacht={self._reachable_time!r}, " f"retrt={self._retrans_timer!r}, " - f"{', '.join(f"{nd_option!r}" for nd_option in self._nd_options)})" + f"options={self._nd_options!r})" ) @override @@ -136,7 +136,7 @@ def __bytes__(self) -> bytes: """ return struct.pack( - f"! BBH BBH L L {len(self._raw_nd_options)}s", + f"! BBH BBH L L {len(self._nd_options)}s", int(self._type), int(self._code), 0, @@ -145,7 +145,7 @@ def __bytes__(self) -> bytes: self._router_lifetime, self._reachable_time, self._retrans_timer, - self._raw_nd_options, + bytes(self._nd_options), ) @property diff --git a/pytcp/protocols/icmp6/message__nd__router_solicitation.py b/pytcp/protocols/icmp6/message__nd__router_solicitation.py index 921ed6d0..38ea820d 100644 --- a/pytcp/protocols/icmp6/message__nd__router_solicitation.py +++ b/pytcp/protocols/icmp6/message__nd__router_solicitation.py @@ -41,7 +41,7 @@ class for the ICMPv6 protocol. from pytcp.protocols.icmp6.message import Icmp6Code, Icmp6Type from pytcp.protocols.icmp6.message__nd import Icmp6NdMessage -from pytcp.protocols.icmp6.options__nd import Icmp6NdOption +from pytcp.protocols.icmp6.options__nd import Icmp6NdOptions # 'Neighbor Discovery - Router Solicitation' message (133/0) @@ -73,7 +73,7 @@ class Icmp6NdRouterSolicitationMessage(Icmp6NdMessage): _code = Icmp6NdRouterSolicitationCode.DEFAULT _reserved: int - _nd_options: list[Icmp6NdOption] + _nd_options: Icmp6NdOptions @override def __len__(self) -> int: @@ -81,9 +81,7 @@ def __len__(self) -> int: Get the messeage length. """ - return ICMP6_MESSAGE_LEN__ND_ROUTER_SOLICITATION + sum( - len(option) for option in self._nd_options - ) + return ICMP6_MESSAGE_LEN__ND_ROUTER_SOLICITATION + len(self._nd_options) @override def __str__(self) -> str: @@ -91,10 +89,7 @@ def __str__(self) -> str: Get the message log string. """ - return ( - f"ICMPv6 ND Router Solicitation, " - f"{', '.join(str(nd_option) for nd_option in self._nd_options)}" - ) + return f"ICMPv6 ND Router Solicitation, " f"opts [{self._nd_options}]" @override def __repr__(self) -> str: @@ -103,8 +98,7 @@ def __repr__(self) -> str: """ return ( - "Icmp6NdRouterSolicitationMessage(" - f"{', '.join(f"{nd_option!r}" for nd_option in self._nd_options)})" + "Icmp6NdRouterSolicitationMessage(" f"options={self._nd_options!r})" ) @override @@ -114,10 +108,10 @@ def __bytes__(self) -> bytes: """ return struct.pack( - f"! BBH L {len(self._raw_nd_options)}s", + f"! BBH L {len(self._nd_options)}s", int(self._type), int(self._code), 0, self._reserved, - self._raw_nd_options, + bytes(self._nd_options), ) diff --git a/pytcp/protocols/icmp6/options__nd.py b/pytcp/protocols/icmp6/options__nd.py index 4c4bccce..6ed849ad 100644 --- a/pytcp/protocols/icmp6/options__nd.py +++ b/pytcp/protocols/icmp6/options__nd.py @@ -24,6 +24,7 @@ ############################################################################ # pylint: disable=fixme +# pylint: disable=redefined-builtin """ @@ -38,10 +39,11 @@ from __future__ import annotations import struct +from dataclasses import dataclass from typing import override from pytcp.lib.enum import ProtoEnum -from pytcp.lib.ip6_address import Ip6Network +from pytcp.lib.ip6_address import Ip6Address, Ip6Mask, Ip6Network from pytcp.lib.mac_address import MacAddress from pytcp.lib.proto import Proto @@ -88,9 +90,9 @@ # TODO: Add ICMPv6 MTU Option (5). -class Icmp6NdOptionCode(ProtoEnum): +class Icmp6NdOptionType(ProtoEnum): """ - ICMPv6 Neighbor Discovery option codes. + ICMPv6 Neighbor Discovery option types. """ SLLA = 1 @@ -107,12 +109,157 @@ def _extract(frame: bytes) -> int: ICMP6_ND_OPT_LEN__PI = 32 +@dataclass +class NdPrefixInfo: + """ + Neighbor Discovery Prefix Information. + """ + + flag_l: bool + flag_a: bool + flag_r: bool + valid_lifetime: int + preferred_lifetime: int + prefix: Ip6Network + + +class Icmp6NdOptions: + """ + ICMPv6 ND options. + """ + + _options: list[Icmp6NdOption] + + def __init__(self, *options: Icmp6NdOption) -> None: + """ + Initialize the ICMPv6 options. + """ + + self._options = list(options) + + def __len__(self) -> int: + """ + Get the options length. + """ + + return sum(len(option) for option in self._options) + + def __str__(self) -> str: + """ + Get the options log string. + """ + + return ", ".join(str(option) for option in self._options) + + def __repr__(self) -> str: + """ + Get the options representation string. + """ + + return f"TcpOptions(options={self._options!r})" + + def __bytes__(self) -> bytes: + """ + Get the options as bytes. + """ + + return b"".join(bytes(option) for option in self._options) + + def __bool__(self) -> bool: + """ + Check if the options are present. + """ + + return bool(self._options) + + @property + def slla(self) -> MacAddress | None: + """ + ICMPv6 ND option - Source Link Layer Address. + """ + + for option in self._options: + if isinstance(option, Icmp6NdOptionSlla): + return option.slla + + return None + + @property + def tlla(self) -> MacAddress | None: + """ + ICMPv6 ND option - Target Link Layer Address. + """ + + for option in self._options: + if isinstance(option, Icmp6NdOptionTlla): + return option.tlla + + return None + + @property + def pi(self) -> list[NdPrefixInfo]: + """ + ICMPv6 ND option - Prefix Information. + """ + + prefix_info_list = [] + + for option in self._options: + if isinstance(option, Icmp6NdOptionPi): + prefix_info_list.append( + NdPrefixInfo( + flag_l=option.flag_l, + flag_a=option.flag_a, + flag_r=option.flag_r, + valid_lifetime=option.valid_lifetime, + preferred_lifetime=option.preferred_lifetime, + prefix=option.prefix, + ) + ) + + return prefix_info_list + + @staticmethod + def from_bytes(frame: bytes) -> Icmp6NdOptions: + """ + Read the ICMPv6 ND options from bytes. + """ + + option__ptr = 0 + + options: list[Icmp6NdOption] = [] + while option__ptr < len(frame): + match Icmp6NdOptionType.from_frame(frame[option__ptr:]): + case Icmp6NdOptionType.SLLA: + options.append( + Icmp6NdOptionSlla.from_bytes(frame[option__ptr:]) + ) + case Icmp6NdOptionType.TLLA: + options.append( + Icmp6NdOptionTlla.from_bytes(frame[option__ptr:]) + ) + case Icmp6NdOptionType.PI: + options.append( + Icmp6NdOptionPi.from_bytes(frame[option__ptr:]) + ) + case _: + options.append( + Icmp6NdOptionUnknown.from_frame( + frame=frame[option__ptr:] + ) + ) + + option__ptr += options[-1].len + + return Icmp6NdOptions(*options) + + class Icmp6NdOption(Proto): """ Base class for ICMPv6 ND option. """ - _code: Icmp6NdOptionCode + _type: Icmp6NdOptionType _len: int @override @@ -124,12 +271,12 @@ def __len__(self) -> int: return self._len @property - def code(self) -> Icmp6NdOptionCode: + def type(self) -> Icmp6NdOptionType: """ - Get the '_code' option field. + Get the '_type' option field. """ - return self._code + return self._type @property def len(self) -> int: @@ -145,11 +292,18 @@ class Icmp6NdOptionSlla(Icmp6NdOption): ICMPv6 ND option - Source Link Layer Address (1). """ - _code = Icmp6NdOptionCode.SLLA + _type = Icmp6NdOptionType.SLLA _len = ICMP6_ND_OPT_LEN__SLLA _slla: MacAddress + def __init__(self, *, slla: MacAddress) -> None: + """ + Initialize the ICMPv6 ND option. + """ + + self._slla = slla + @override def __str__(self) -> str: """ @@ -174,7 +328,7 @@ def __bytes__(self) -> bytes: return struct.pack( "! BB 6s", - int(self._code), + int(self._type), self._len >> 3, bytes(self._slla), ) @@ -187,17 +341,32 @@ def slla(self) -> MacAddress: return self._slla + @staticmethod + def from_bytes(bytes: bytes) -> Icmp6NdOptionSlla: + """ + Read the ICMPv6 ND option from frame. + """ + + return Icmp6NdOptionSlla(slla=MacAddress(bytes[2:8])) + class Icmp6NdOptionTlla(Icmp6NdOption): """ ICMPv6 ND option - Target Link Layer Address (2). """ - _code = Icmp6NdOptionCode.TLLA + _type = Icmp6NdOptionType.TLLA _len = ICMP6_ND_OPT_LEN__TLLA _tlla: MacAddress + def __init__(self, *, tlla: MacAddress) -> None: + """ + Initialize the ICMPv6 ND option. + """ + + self._tlla = tlla + @override def __str__(self) -> str: """ @@ -222,7 +391,7 @@ def __bytes__(self) -> bytes: return struct.pack( "! BB 6s", - int(self._code), + int(self._type), self._len >> 3, bytes(self._tlla), ) @@ -235,24 +404,54 @@ def tlla(self) -> MacAddress: return self._tlla + @staticmethod + def from_bytes(bytes: bytes) -> Icmp6NdOptionTlla: + """ + Read the ICMPv6 ND option from frame. + """ + + return Icmp6NdOptionTlla(tlla=MacAddress(bytes[2:8])) + class Icmp6NdOptionPi(Icmp6NdOption): """ ICMPv6 ND option - Prefix Information (3). """ - _code = Icmp6NdOptionCode.PI + _type = Icmp6NdOptionType.PI _len = ICMP6_ND_OPT_LEN__PI _flag_l: bool _flag_a: bool _flag_r: bool - _reserved_1: int _valid_lifetime: int _preferred_lifetime: int - _reserved_2: int _prefix: Ip6Network + def __init__( + self, + *, + flag_l: bool = False, + flag_a: bool = False, + flag_r: bool = False, + valid_lifetime: int, + preferred_lifetime: int, + prefix: Ip6Network, + ) -> None: + """ + Initialize the ICMPv6 ND option. + """ + + assert 0 <= valid_lifetime <= 0xFFFFFFFF + assert 0 <= preferred_lifetime <= 0xFFFFFFFF + + self._flag_l = flag_l + self._flag_a = flag_a + self._flag_r = flag_r + self._valid_lifetime = valid_lifetime + self._preferred_lifetime = preferred_lifetime + self._prefix = prefix + @override def __str__(self) -> str: """ @@ -266,7 +465,7 @@ def __str__(self) -> str: @override def __repr__(self) -> str: """ - Get the ption representation. + Get the option representation. """ return ( @@ -287,16 +486,16 @@ def __bytes__(self) -> bytes: return struct.pack( "! BB BB L L L 16s", - int(self._code), + int(self._type), self._len >> 3, len(self._prefix.mask), (self._flag_l << 7) | (self._flag_a << 6) | (self._flag_r << 6) - | (self._reserved_1 & 0b00011111), + | (0 & 0b00011111), self._valid_lifetime, self._preferred_lifetime, - self._reserved_2, + 0, bytes(self._prefix.address), ) @@ -348,24 +547,52 @@ def prefix(self) -> Ip6Network: return self._prefix + @staticmethod + def from_bytes(bytes: bytes) -> Icmp6NdOptionPi: + """ + Read the ICMPv6 ND option from frame. + """ + + return Icmp6NdOptionPi( + flag_l=bool(bytes[3] & 0b10000000), + flag_a=bool(bytes[3] & 0b01000000), + flag_r=bool(bytes[3] & 0b00100000), + valid_lifetime=struct.unpack_from("!L", bytes, 4)[0], + preferred_lifetime=struct.unpack_from("!L", bytes, 8)[0], + prefix=Ip6Network( + (Ip6Address(bytes[16:32]), Ip6Mask(f"/{bytes[2]}")) + ), + ) + -class Icmp6NdOptionUnk(Icmp6NdOption): +class Icmp6NdOptionUnknown(Icmp6NdOption): """ ICMPv6 ND unknown option. """ - _code: Icmp6NdOptionCode + _type: Icmp6NdOptionType _len: int _data: bytes + def __init__( + self, *, type: Icmp6NdOptionType, len: int, data: bytes + ) -> None: + """ + Initialize the ICMPv6 ND unknown option. + """ + + self._type = type + self._len = len + self._data = data + @override def __str__(self) -> str: """ Get the option log string. """ - return f"unk-{self._code}-{self._len}" + return f"unk-{self._type}-{self._len}" @override def __repr__(self) -> str: @@ -374,8 +601,8 @@ def __repr__(self) -> str: """ return ( - f"Icmp6NdOptUnk(" - f"code={self._code!r}, " + f"Icmp6NdOptUnknown(" + f"type={self._type!r}, " f"len={self._len!r}, " f"data={self._data!r})" ) @@ -388,7 +615,7 @@ def __bytes__(self) -> bytes: return struct.pack( f"! BB {len(self._data)}s", - self._code, + self._type, self._len >> 3, bytes(self._data), ) @@ -400,3 +627,15 @@ def data(self) -> bytes: """ return self._data + + @staticmethod + def from_frame(frame: bytes) -> Icmp6NdOptionUnknown: + """ + Read the ICMPv6 ND option from frame. + """ + + return Icmp6NdOptionUnknown( + type=Icmp6NdOptionType.from_frame(frame), + len=len(frame), + data=frame[2:], + ) diff --git a/pytcp/protocols/icmp6/packet_handler_rx.py b/pytcp/protocols/icmp6/packet_handler_rx.py index 82a29627..ce2f8ce9 100755 --- a/pytcp/protocols/icmp6/packet_handler_rx.py +++ b/pytcp/protocols/icmp6/packet_handler_rx.py @@ -57,7 +57,6 @@ from pytcp.protocols.icmp6.assembler import ( Icmp6EchoReplyMessageAssembler, Icmp6NdNeighborAdvertisementMessageAssembler, - Icmp6NdOptTllaAssembler, ) from pytcp.protocols.icmp6.message__destination_unreachable import ( Icmp6PortUnreachableMessage, @@ -75,6 +74,7 @@ from pytcp.protocols.icmp6.message__nd__router_solicitation import ( Icmp6NdRouterSolicitationMessage, ) +from pytcp.protocols.icmp6.options__nd import Icmp6NdOptions, Icmp6NdOptionTlla from pytcp.protocols.icmp6.parser import Icmp6Parser from pytcp.protocols.ip6.header import IP6_HEADER_LEN, Ip6Next from pytcp.protocols.udp.header import UDP_HEADER_LEN @@ -123,7 +123,7 @@ def _phrx_icmp6(self, *, packet_rx: PacketRx) -> None: self.packet_stats_rx.icmp6__pre_parse += 1 try: - Icmp6Parser(packet_rx) + Icmp6Parser(packet_rx=packet_rx) except PacketValidationError as error: __debug__ and log( "icmp6", @@ -273,8 +273,8 @@ def __phrx_icmp6__nd_router_advertisement( ) # Make note of prefixes that can be used for address autoconfiguration self.icmp6_ra_prefixes = [ - (opt, packet_rx.ip6.src) - for opt in packet_rx.icmp6.message.option_pi + (option.prefix, packet_rx.ip6.src) + for option in packet_rx.icmp6.message.option_pi ] self.icmp6_ra_event.release() return @@ -347,7 +347,9 @@ def __phrx_icmp6__nd_neighbor_solicitation( flag_s=not ip6_nd_dad, # no S flag when responding to DAD request flag_o=ip6_nd_dad, # O flag when respondidng to DAD request (this is not necessary but Linux uses it) target_address=packet_rx.icmp6.message.target_address, - nd_options=[Icmp6NdOptTllaAssembler(tlla=self.mac_unicast)], + nd_options=Icmp6NdOptions( + Icmp6NdOptionTlla(tlla=self.mac_unicast), + ), ), echo_tracker=packet_rx.tracker, ) diff --git a/pytcp/protocols/icmp6/parser.py b/pytcp/protocols/icmp6/parser.py index f263cea3..fae0597c 100755 --- a/pytcp/protocols/icmp6/parser.py +++ b/pytcp/protocols/icmp6/parser.py @@ -45,9 +45,8 @@ from typing import TYPE_CHECKING, override from pytcp.lib.errors import PacketIntegrityError, PacketSanityError -from pytcp.lib.ip6_address import Ip6Address, Ip6Mask, Ip6Network +from pytcp.lib.ip6_address import Ip6Address from pytcp.lib.ip_helper import inet_cksum -from pytcp.lib.mac_address import MacAddress from pytcp.lib.proto import ProtoParser from pytcp.protocols.icmp4.message__unreachable import ( ICMP4_MESSAGE_LEN__UNREACHABLE, @@ -98,14 +97,7 @@ Icmp6NdRouterSolicitationMessage, ) from pytcp.protocols.icmp6.message__unknown import Icmp6UnknownMessage -from pytcp.protocols.icmp6.options__nd import ( - Icmp6NdOption, - Icmp6NdOptionCode, - Icmp6NdOptionPi, - Icmp6NdOptionSlla, - Icmp6NdOptionTlla, - Icmp6NdOptionUnk, -) +from pytcp.protocols.icmp6.options__nd import Icmp6NdOptions if TYPE_CHECKING: from pytcp.lib.packet import PacketRx @@ -116,7 +108,7 @@ class Icmp6IntegrityError(PacketIntegrityError): Exception raised when ICMPv6 packet integrity check fails. """ - def __init__(self, message: str): + def __init__(self, /, message: str): super().__init__("[ICMPv6] " + message) @@ -125,7 +117,7 @@ class Icmp6SanityError(PacketSanityError): Exception raised when ICMPv6 packet sanity check fails. """ - def __init__(self, message: str): + def __init__(self, /, message: str): super().__init__("[ICMPv6] " + message) @@ -134,7 +126,7 @@ class Icmp6Parser(Icmp6, ProtoParser): ICMPv6 packet parser class. """ - def __init__(self, packet_rx: PacketRx) -> None: + def __init__(self, *, packet_rx: PacketRx) -> None: """ Parse ICMPv6 packet. """ @@ -570,7 +562,7 @@ def __init__(self, /, frame: bytes) -> None: """ self._reserved = struct.unpack("! L", frame[4:8])[0] - self._nd_options = _scan_icmp6_nd_options(frame[8:]) + self._nd_options = Icmp6NdOptions.from_bytes(frame[8:]) class Icmp6NdRouterAdvertisementMessageParser( @@ -591,7 +583,7 @@ def __init__(self, /, frame: bytes) -> None: self._router_lifetime = struct.unpack("! H", frame[6:8])[0] self._reachable_time = struct.unpack("! L", frame[8:12])[0] self._retrans_timer = struct.unpack("! L", frame[12:16])[0] - self._nd_options = _scan_icmp6_nd_options(frame[16:]) + self._nd_options = Icmp6NdOptions.from_bytes(frame[16:]) class Icmp6NdNeighborSolicitationMessageParser( @@ -608,7 +600,7 @@ def __init__(self, /, frame: bytes) -> None: self._reserved = struct.unpack("! L", frame[4:8])[0] self._target_address = Ip6Address(frame[8:24]) - self._nd_options = _scan_icmp6_nd_options(frame[24:]) + self._nd_options = Icmp6NdOptions.from_bytes(frame[24:]) class Icmp6NdNeighborAdvertisementMessageParser( @@ -631,7 +623,7 @@ def __init__(self, /, frame: bytes) -> None: & 0b00011111_11111111_11111111_11111111 ) self._target_address = Ip6Address(frame[8:24]) - self._nd_options = _scan_icmp6_nd_options(frame[24:]) + self._nd_options = Icmp6NdOptions.from_bytes(frame[24:]) class Icmp6Mld2ReportMessageParser(Icmp6Mld2ReportMessage): @@ -669,78 +661,6 @@ def __init__(self, /, frame: bytes) -> None: self._code = Icmp6Code.from_frame(frame) -# -# The ICMPv6 Neighbor Discovery option classes. -# - - -class Icmp6NdOptSllaParser(Icmp6NdOptionSlla): - """ - ICMPv6 ND option parser - Source Link Layer Address (1). - """ - - def __init__(self, /, frame: bytes) -> None: - """ - Option constructor. - """ - - self._code = Icmp6NdOptionCode.from_frame(frame) - self._len = frame[1] << 3 - self._slla = MacAddress(frame[2:8]) - - -class Icmp6NdOptTllaParser(Icmp6NdOptionTlla): - """ - ICMPv6 ND option parser - Target Link Layer Address (2). - """ - - def __init__(self, /, frame: bytes) -> None: - """ - Option constructor. - """ - - self._code = Icmp6NdOptionCode.from_frame(frame) - self._len = frame[1] << 3 - self._tlla = MacAddress(frame[2:8]) - - -class Icmp6NdOptPiParser(Icmp6NdOptionPi): - """ - ICMPv6 ND option - Prefix Information (3). - """ - - def __init__(self, /, frame: bytes) -> None: - """ - Option constructor. - """ - - self._code = Icmp6NdOptionCode.from_frame(frame) - self._len = frame[1] << 3 - self._flag_l = bool(frame[3] & 0b10000000) - self._flag_a = bool(frame[3] & 0b01000000) - self._flag_r = bool(frame[3] & 0b00100000) - self._valid_lifetime = struct.unpack_from("!L", frame, 4)[0] - self._preferred_lifetime = struct.unpack_from("!L", frame, 8)[0] - self._prefix = Ip6Network( - (Ip6Address(frame[16:32]), Ip6Mask(f"/{frame[2]}")) - ) - - -class Icmp6NdOptUnkParser(Icmp6NdOptionUnk): - """ - ICMPv6 ND option - Unknown. - """ - - def __init__(self, /, frame: bytes) -> None: - """ - Option constructor. - """ - - self._code = Icmp6NdOptionCode.from_frame(frame) - self._len = frame[1] << 3 - self._data = frame[2 : self._len] - - # # The ICMPv6 Multicast support classes. # @@ -765,34 +685,3 @@ def __init__(self, raw_record: bytes) -> None: for n in range(self._number_of_sources) ] self.aux_data = raw_record[20 + 16 * self._number_of_sources :] - - -# -# Helper functions. -# - - -def _scan_icmp6_nd_options( - frame: bytes, -) -> list[Icmp6NdOption]: - """ - Create ND option list from provided frame. - """ - - option_ptr = 0 - options: list[Icmp6NdOption] = [] - - while option_ptr < len(frame): - match Icmp6NdOptionCode.from_frame(frame[option_ptr:]): - case Icmp6NdOptionCode.SLLA: - options.append(Icmp6NdOptSllaParser(frame[option_ptr:])) - case Icmp6NdOptionCode.TLLA: - options.append(Icmp6NdOptTllaParser(frame[option_ptr:])) - case Icmp6NdOptionCode.PI: - options.append(Icmp6NdOptPiParser(frame[option_ptr:])) - case _: - options.append(Icmp6NdOptUnkParser(frame[option_ptr:])) - - option_ptr += options[-1].len - - return options diff --git a/pytcp/protocols/ip4/options.py b/pytcp/protocols/ip4/options.py index 3fb8a042..57206802 100644 --- a/pytcp/protocols/ip4/options.py +++ b/pytcp/protocols/ip4/options.py @@ -107,6 +107,13 @@ def __bytes__(self) -> bytes: return b"".join(bytes(option) for option in self._options) + def __bool__(self) -> bool: + """ + Check if the options are present. + """ + + return bool(self._options) + @staticmethod def from_frame(frame: bytes) -> Ip4Options: """ diff --git a/pytcp/protocols/tcp/options.py b/pytcp/protocols/tcp/options.py index 493066fd..da28c486 100644 --- a/pytcp/protocols/tcp/options.py +++ b/pytcp/protocols/tcp/options.py @@ -116,6 +116,13 @@ def __bytes__(self) -> bytes: return b"".join(bytes(option) for option in self._options) + def __bool__(self) -> bool: + """ + Check if the options are present. + """ + + return bool(self._options) + @property def mss(self) -> int: """ diff --git a/pytcp/subsystems/nd_cache.py b/pytcp/subsystems/nd_cache.py index 77e37849..065400a2 100755 --- a/pytcp/subsystems/nd_cache.py +++ b/pytcp/subsystems/nd_cache.py @@ -49,8 +49,8 @@ from pytcp.lib.mac_address import MacAddress from pytcp.protocols.icmp6.assembler import ( Icmp6NdNeighborSolicitationMessageAssembler, - Icmp6NdOptSllaAssembler, ) +from pytcp.protocols.icmp6.options__nd import Icmp6NdOptions, Icmp6NdOptionSlla class NdCache: @@ -202,10 +202,8 @@ def _send_icmp6_neighbor_solicitation( ip6__hop=255, icmp6__message=Icmp6NdNeighborSolicitationMessageAssembler( target_address=icmp6_ns_target_address, - nd_options=[ - Icmp6NdOptSllaAssembler( - slla=stack.packet_handler.mac_unicast - ) - ], + nd_options=Icmp6NdOptions( + Icmp6NdOptionSlla(slla=stack.packet_handler.mac_unicast) + ), ), ) diff --git a/pytcp/subsystems/packet_handler.py b/pytcp/subsystems/packet_handler.py index 0f6b6a37..3c9442f7 100755 --- a/pytcp/subsystems/packet_handler.py +++ b/pytcp/subsystems/packet_handler.py @@ -70,10 +70,10 @@ Icmp6Mld2RecordType, Icmp6Mld2ReportMessageAssembler, Icmp6NdNeighborSolicitationMessageAssembler, - Icmp6NdOptSllaAssembler, Icmp6NdRouterSolicitationMessageAssembler, ) from pytcp.protocols.icmp6.message import Icmp6Message +from pytcp.protocols.icmp6.options__nd import Icmp6NdOptions, Icmp6NdOptionSlla from pytcp.protocols.icmp6.packet_handler_rx import PacketHandlerRxIcmp6 from pytcp.protocols.icmp6.packet_handler_tx import PacketHandlerTxIcmp6 from pytcp.protocols.ip4.packet_handler_rx import PacketHandlerRxIp4 @@ -606,7 +606,7 @@ def _send_icmp6_nd_dad_message( ip6__hop=255, icmp6__message=Icmp6NdNeighborSolicitationMessageAssembler( target_address=ip6_unicast_candidate, - nd_options=[], # ND DAD message has no options. + nd_options=Icmp6NdOptions(), # ND DAD message has no options. ), ) @@ -632,9 +632,9 @@ def _send_icmp6_nd_router_solicitation(self) -> None: ip6__dst=Ip6Address("ff02::2"), ip6__hop=255, icmp6__message=Icmp6NdRouterSolicitationMessageAssembler( - nd_options=[ - Icmp6NdOptSllaAssembler(slla=self.mac_unicast), - ], + nd_options=Icmp6NdOptions( + Icmp6NdOptionSlla(slla=self.mac_unicast), + ), ), ) diff --git a/tests/unit/protocols__icmp6__phtx.py b/tests/unit/protocols__icmp6__phtx.py index f22f71ef..62f9f6a6 100755 --- a/tests/unit/protocols__icmp6__phtx.py +++ b/tests/unit/protocols__icmp6__phtx.py @@ -39,10 +39,10 @@ from pytcp.protocols.icmp6.assembler import ( Icmp6EchoReplyMessageAssembler, Icmp6EchoRequestMessageAssembler, - Icmp6NdOptSllaAssembler, Icmp6NdRouterSolicitationMessageAssembler, Icmp6PortUnreachableMessageAssembler, ) +from pytcp.protocols.icmp6.options__nd import Icmp6NdOptions, Icmp6NdOptionSlla from pytcp.subsystems.packet_handler import PacketHandler from tests.unit.mock_network import ( MockNetworkSettings, @@ -173,9 +173,9 @@ def test_icmp6_phtx__ip6_icmp6_nd_router_solicitation(self) -> None: ip6__dst=self.mns.ip6_multicast_all_routers, ip6__hop=255, icmp6__message=Icmp6NdRouterSolicitationMessageAssembler( - nd_options=[ - Icmp6NdOptSllaAssembler(slla=self.mns.stack_mac_address) - ], + nd_options=Icmp6NdOptions( + Icmp6NdOptionSlla(slla=self.mns.stack_mac_address) + ), ), ) self.assertEqual(tx_status, TxStatus.PASSED__ETHERNET__TO_TX_RING)