From 35ab3131f4851b5631b3ed65aa1fc55142f669cf Mon Sep 17 00:00:00 2001 From: Sebastian Majewski Date: Sun, 7 Jul 2024 09:50:01 -0500 Subject: [PATCH] Cleanup - all working but Ipv6 fragmentation --- pytcp/protocols/icmp6/fpa.py | 4 +- pytcp/protocols/icmp6/phrx.py | 5 +- pytcp/protocols/icmp6/ps.py | 12 +- pytcp/protocols/ip4/fpa.py | 6 +- pytcp/protocols/ip4/phrx.py | 2 +- pytcp/protocols/ip4/ps.py | 11 ++ pytcp/protocols/ip6/fpa.py | 141 ++++--------------- pytcp/protocols/ip6/fpp.py | 184 +++++++------------------ pytcp/protocols/ip6/phrx.py | 25 ++-- pytcp/protocols/ip6/phtx.py | 6 +- pytcp/protocols/ip6/ps.py | 204 ++++++++++++++++++++++++++-- pytcp/protocols/ip6_ext_frag/fpa.py | 29 +--- pytcp/protocols/raw/fpa.py | 7 - pytcp/protocols/raw/ps.py | 31 +++++ pytcp/protocols/tcp/fpa.py | 5 - pytcp/protocols/tcp/ps.py | 21 +++ pytcp/protocols/udp/fpa.py | 5 - pytcp/protocols/udp/ps.py | 21 +++ 18 files changed, 391 insertions(+), 328 deletions(-) diff --git a/pytcp/protocols/icmp6/fpa.py b/pytcp/protocols/icmp6/fpa.py index 46ca94c6..90e47938 100755 --- a/pytcp/protocols/icmp6/fpa.py +++ b/pytcp/protocols/icmp6/fpa.py @@ -69,7 +69,7 @@ Icmp6NdRouterSolicitationMessage, Icmp6PortUnreachableMessage, ) -from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, IP6_NEXT_ICMP6 +from pytcp.protocols.ip6.ps import IP6_HEADER_LEN if TYPE_CHECKING: from pytcp.lib.mac_address import MacAddress @@ -80,8 +80,6 @@ class Icmp6Assembler(Icmp6, ProtoAssembler): ICMPv6 packet assembler support class. """ - ip6_next = IP6_NEXT_ICMP6 - def __init__( self, *, diff --git a/pytcp/protocols/icmp6/phrx.py b/pytcp/protocols/icmp6/phrx.py index 9f3d3a90..17ecd141 100755 --- a/pytcp/protocols/icmp6/phrx.py +++ b/pytcp/protocols/icmp6/phrx.py @@ -63,7 +63,7 @@ Icmp6NdRouterSolicitationMessage, Icmp6PortUnreachableMessage, ) -from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, IP6_NEXT_UDP +from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, Ip6Next from pytcp.protocols.udp.metadata import UdpMetadata from pytcp.protocols.udp.ps import UDP_HEADER_LEN @@ -82,6 +82,7 @@ class PacketHandlerRxIcmp6(ABC): from pytcp.lib.tracker import Tracker from pytcp.lib.tx_status import TxStatus from pytcp.protocols.icmp6.ps import Icmp6Message + from pytcp.protocols.ip6.ps import Ip6Next packet_stats_rx: PacketStatsRx icmp6_ra_event: Semaphore @@ -154,7 +155,7 @@ def __phrx_icmp6__port_unreachable(self, *, packet_rx: PacketRx) -> None: if ( len(frame) >= IP6_HEADER_LEN + UDP_HEADER_LEN and frame[0] >> 4 == 6 - and frame[6] == IP6_NEXT_UDP + and frame[6] == int(Ip6Next.UDP) ): # Create UdpMetadata object and try to find matching UDP socket udp_offset = IP6_HEADER_LEN diff --git a/pytcp/protocols/icmp6/ps.py b/pytcp/protocols/icmp6/ps.py index 6f7f0014..5b92d270 100755 --- a/pytcp/protocols/icmp6/ps.py +++ b/pytcp/protocols/icmp6/ps.py @@ -42,7 +42,7 @@ from pytcp.lib.ip6_address import Ip6Address, Ip6Network from pytcp.lib.mac_address import MacAddress from pytcp.lib.proto import Proto -from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, IP6_MIN_MTU +from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, IP6_MIN_MTU, Ip6Next # Destination Unreachable message (1/[0-6]) @@ -471,6 +471,8 @@ class Icmp6(Proto): Base class for ICMPv6 packet parser and assembler classes. """ + _ip6_next = Ip6Next.ICMP6 + _message: Icmp6Message @override @@ -497,6 +499,14 @@ def __bytes__(self) -> bytes: return bytes(self._message) + @property + def ip6_next(self) -> Ip6Next: + """ + Get the '_ip6_next' attribute. + """ + + return self._ip6_next + @property def type(self) -> Icmp6Type: """ diff --git a/pytcp/protocols/ip4/fpa.py b/pytcp/protocols/ip4/fpa.py index 9e8eb671..1493ae30 100755 --- a/pytcp/protocols/ip4/fpa.py +++ b/pytcp/protocols/ip4/fpa.py @@ -38,7 +38,7 @@ from __future__ import annotations import struct -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, override from pytcp import config from pytcp.lib.ip4_address import Ip4Address @@ -88,6 +88,7 @@ def __init__( assert 0 <= ip4__dscp <= 0x3F assert 0 <= ip4__ecn <= 0x03 assert 0 <= ip4__id <= 0xFFFF + assert ip4__payload.ip4_proto in Ip4Proto self._payload = ip4__payload self._tracker = self._payload.tracker @@ -110,6 +111,7 @@ def __init__( self._dlen = len(self._payload) self._plen = self._hlen + self._dlen + @override def __len__(self) -> int: """ Get length of the packet. @@ -144,8 +146,6 @@ class Ip4FragAssembler(Ip4): IPv4 packet fragment assembler support class. """ - ethernet_type = EthernetType.IP4 - def __init__( self, *, diff --git a/pytcp/protocols/ip4/phrx.py b/pytcp/protocols/ip4/phrx.py index c51d77d8..70e10559 100755 --- a/pytcp/protocols/ip4/phrx.py +++ b/pytcp/protocols/ip4/phrx.py @@ -218,7 +218,7 @@ def __defragment_ip4_packet( f"{len(self.ip4_frag_flows[flow_id]['data'][offset])}s", data, offset, - self.ip4_frag_flows[flow_id]["data"][offset], + bytes(self.ip4_frag_flows[flow_id]["data"][offset]), ) del self.ip4_frag_flows[flow_id] header[0] = 0x45 diff --git a/pytcp/protocols/ip4/ps.py b/pytcp/protocols/ip4/ps.py index b724d47b..68002d78 100755 --- a/pytcp/protocols/ip4/ps.py +++ b/pytcp/protocols/ip4/ps.py @@ -41,6 +41,7 @@ from pytcp.lib.enum import ProtoEnum from pytcp.lib.ip4_address import Ip4Address from pytcp.lib.proto import Proto +from pytcp.protocols.ethernet.ps import EthernetType if TYPE_CHECKING: from pytcp.protocols.icmp4.fpa import Icmp4Assembler @@ -104,6 +105,8 @@ class Ip4(Proto): Base class for IPv4 packet parser and assembler. """ + _ethernet_type = EthernetType.IP4 + _ver: int _hlen: int _olen: int @@ -189,6 +192,14 @@ def __bytes__(self) -> bytes: raw_options, ) + @property + def ethernet_type(self) -> EthernetType: + """ + Get the '_ethernet_type' attribute. + """ + + return self._ethernet_type + @property def ver(self) -> int: """ diff --git a/pytcp/protocols/ip6/fpa.py b/pytcp/protocols/ip6/fpa.py index 139300a1..ec44f623 100755 --- a/pytcp/protocols/ip6/fpa.py +++ b/pytcp/protocols/ip6/fpa.py @@ -37,20 +37,13 @@ from __future__ import annotations import struct -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, override from pytcp import config from pytcp.lib.ip6_address import Ip6Address +from pytcp.lib.proto import ProtoAssembler from pytcp.protocols.ethernet.ps import EthernetType -from pytcp.protocols.ip6.ps import ( - IP6_HEADER_LEN, - IP6_NEXT_EXT_FRAG, - IP6_NEXT_ICMP6, - IP6_NEXT_RAW, - IP6_NEXT_TABLE, - IP6_NEXT_TCP, - IP6_NEXT_UDP, -) +from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, Ip6, Ip6Next from pytcp.protocols.raw.fpa import RawAssembler if TYPE_CHECKING: @@ -58,7 +51,7 @@ from pytcp.protocols.ip6.ps import Ip6Payload -class Ip6Assembler: +class Ip6Assembler(Ip6, ProtoAssembler): """ IPv6 packet assembler support class. """ @@ -68,42 +61,38 @@ class Ip6Assembler: def __init__( self, *, - src: Ip6Address = Ip6Address(0), - dst: Ip6Address = Ip6Address(0), - hop: int = config.IP6_DEFAULT_HOP, - dscp: int = 0, - ecn: int = 0, - flow: int = 0, + ip6__src: Ip6Address = Ip6Address(0), + ip6__dst: Ip6Address = Ip6Address(0), + ip6__hop: int = config.IP6_DEFAULT_HOP, + ip6__dscp: int = 0, + ip6__ecn: int = 0, + ip6__flow: int = 0, ip6__payload: Ip6Payload = RawAssembler(), ) -> None: """ Class constructor. """ - assert 0 <= hop <= 0xFF - assert 0 <= dscp <= 0x3F - assert 0 <= ecn <= 0x03 - assert 0 <= flow <= 0xFFFFFF - assert ip6__payload.ip6_next in { - IP6_NEXT_ICMP6, - IP6_NEXT_UDP, - IP6_NEXT_TCP, - IP6_NEXT_EXT_FRAG, - IP6_NEXT_RAW, - } + assert 0 <= ip6__hop <= 0xFF + assert 0 <= ip6__dscp <= 0x3F + assert 0 <= ip6__ecn <= 0x03 + assert 0 <= ip6__flow <= 0xFFFFFF + assert ip6__payload.ip6_next in Ip6Next self._payload = ip6__payload self._tracker: Tracker = self._payload.tracker - self._ver: int = 6 - self._dscp: int = dscp - self._ecn: int = ecn - self._flow: int = flow - self._hop: int = hop - self._src: Ip6Address = src - self._dst: Ip6Address = dst - self._next: int = self._payload.ip6_next - self._dlen: int = len(self._payload) - + self._ver = 6 + self._dscp = ip6__dscp + self._ecn = ip6__ecn + self._flow = ip6__flow + self._hop = ip6__hop + self._src = ip6__src + self._dst = ip6__dst + self._next = self._payload.ip6_next + self._dlen = len(self._payload) + self._hlen = IP6_HEADER_LEN + + @override def __len__(self) -> int: """ Length of the packet. @@ -111,90 +100,20 @@ def __len__(self) -> int: return IP6_HEADER_LEN + len(self._payload) - def __str__(self) -> str: - """ - Packet log string. - """ - - return ( - f"IPv6 {self._src} > {self._dst}, next {self._next} " - f"({IP6_NEXT_TABLE.get(self._next, '???')}), flow {self._flow}, " - f"dlen {self._dlen}, hop {self._hop}" - ) - @property def tracker(self) -> Tracker: """ - Getter for the '_tracker' attribute.""" - return self._tracker - - @property - def dst(self) -> Ip6Address: - """ - Getter for the '_dst' attribute. - """ - return self._dst - - @property - def src(self) -> Ip6Address: - """ - Getter for the '_src' attribute. + Getter for the '_tracker' attribute. """ - return self._src - @property - def dlen(self) -> int: - """ - Getter for the '_dlen' attribute. - """ - return self._dlen - - @property - def next(self) -> int: - """ - Getter for the '_next' attribute. - """ - return self._next - - @property - def pshdr_sum(self) -> int: - """ - Returns IPv6 pseudo header that is used by TCP, UDP and ICMPv6 - to compute their checksums. - """ - pseudo_header = struct.pack( - "! 16s 16s L BBBB", - bytes(self._src), - bytes(self._dst), - self._dlen, - 0, - 0, - 0, - self._next, - ) - return sum(struct.unpack("! 5Q", pseudo_header)) + return self._tracker def assemble(self, *, frame: memoryview) -> None: """ Assemble packet into the frame. """ - struct.pack_into( - "! BBBB HBB 16s 16s", - frame, - 0, - self._ver << 4 | self._dscp >> 4, - self._dscp & 0b00000011 << 6 - | self._ecn << 4 - | ((self._flow & 0b000011110000000000000000) >> 16), - (self._flow & 0b000000001111111100000000) >> 8, - self._flow & 0b000000000000000011111111, - self._dlen, - self._next, - self._hop, - bytes(self._src), - bytes(self._dst), - ) + struct.pack_into(f"{self._hlen}s", frame, 0, bytes(self)) self._payload.assemble( frame=frame[IP6_HEADER_LEN:], diff --git a/pytcp/protocols/ip6/fpp.py b/pytcp/protocols/ip6/fpp.py index 454bec2f..32f8b587 100755 --- a/pytcp/protocols/ip6/fpp.py +++ b/pytcp/protocols/ip6/fpp.py @@ -38,11 +38,12 @@ from __future__ import annotations import struct -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, override from pytcp.lib.errors import PacketIntegrityError, PacketSanityError from pytcp.lib.ip6_address import Ip6Address -from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, IP6_NEXT_TABLE +from pytcp.lib.proto import ProtoParser +from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, Ip6, Ip6Next if TYPE_CHECKING: from pytcp.lib.packet import PacketRx @@ -53,7 +54,7 @@ class Ip6IntegrityError(PacketIntegrityError): Exception raised when IPv6 packet integrity check fails. """ - def __init__(self, message: str): + def __init__(self, /, message: str): super().__init__("[IPv6] " + message) @@ -62,131 +63,43 @@ class Ip6SanityError(PacketSanityError): Exception raised when IPv6 packet sanity check fails. """ - def __init__(self, message: str): + def __init__(self, /, message: str): super().__init__("[IPv6] " + message) -class Ip6Parser: +class Ip6Parser(Ip6, ProtoParser): """ IPv6 packet parser class. """ - def __init__(self, packet_rx: PacketRx) -> None: + def __init__(self, *, packet_rx: PacketRx) -> None: """ Class constructor. """ self._frame = packet_rx.frame - self._packet_integrity_check() - self._packet_sanity_check() + + self._validate_integrity() + self._parse() + self._validate_sanity() packet_rx.ip = packet_rx.ip6 = self packet_rx.frame = packet_rx.frame[IP6_HEADER_LEN:] + @override def __len__(self) -> int: """ - Number of bytes remaining in the frame. - """ - return len(self._frame) - - def __str__(self) -> str: - """ - Packet log string. - """ - return ( - f"IPv6 {self.src} > {self.dst}, next {self.next} " - f"({IP6_NEXT_TABLE.get(self.next, '???')}), flow {self.flow}, " - f"dlen {self.dlen}, hop {self.hop}" - ) - - @property - def ver(self) -> int: - """ - Read the 'Version' field. - """ - if "_cache__ver" not in self.__dict__: - self._cache__ver = self._frame[0] >> 4 - return self._cache__ver - - @property - def dscp(self) -> int: - """ - Read the 'DSCP' field. - """ - if "_cache__dscp" not in self.__dict__: - self._cache__dscp = ((self._frame[0] & 0b00001111) << 2) | ( - (self._frame[1] & 0b11000000) >> 6 - ) - return self._cache__dscp - - @property - def ecn(self) -> int: - """ - Read the 'ECN' field. - """ - if "_cache__ecn" not in self.__dict__: - self._cache__ecn = (self._frame[1] & 0b00110000) >> 4 - return self._cache__ecn - - @property - def flow(self) -> int: - """ - Read the 'Flow' field. - """ - if "_cache__flow" not in self.__dict__: - self._cache__flow = ( - ((self._frame[1] & 0b00001111) << 16) - | (self._frame[2] << 8) - | self._frame[3] - ) - return self._cache__flow - - @property - def dlen(self) -> int: - """ - Read the 'Data length' field. - """ - if "_cache__dlen" not in self.__dict__: - self._cache__dlen: int = struct.unpack("!H", self._frame[4:6])[0] - return self._cache__dlen - - @property - def next(self) -> int: - """ - Read the 'Next' field. - """ - return self._frame[6] - - @property - def hop(self) -> int: - """ - Read the 'Hop' field. + Get number of bytes remaining in the frame. """ - return self._frame[7] - @property - def src(self) -> Ip6Address: - """ - Read the 'Source address' field. - """ - if "_cache__src" not in self.__dict__: - self._cache__src = Ip6Address(self._frame[8:24]) - return self._cache__src - - @property - def dst(self) -> Ip6Address: - """ - Read the 'Destination address' field. - """ - if "_cache__dst" not in self.__dict__: - self._cache__dst = Ip6Address(self._frame[24:40]) - return self._cache__dst + return len(self._frame) @property def hlen(self) -> int: """ Calculate header length. """ + return IP6_HEADER_LEN @property @@ -194,6 +107,7 @@ def plen(self) -> int: """ Calculate packet length. """ + return IP6_HEADER_LEN + self.dlen @property @@ -201,54 +115,27 @@ def header_copy(self) -> bytes: """ Return copy of packet header. """ - if "_cache__header_copy" not in self.__dict__: - self._cache__header_copy = bytes(self._frame[:IP6_HEADER_LEN]) - return self._cache__header_copy + + return bytes(self._frame[:IP6_HEADER_LEN]) @property def data_copy(self) -> bytes: """ Return copy of packet data. """ - if "_cache__data_copy" not in self.__dict__: - self._cache__data_copy = bytes( - self._frame[IP6_HEADER_LEN : self.plen] - ) - return self._cache__data_copy + + return bytes(self._frame[IP6_HEADER_LEN : self.plen]) @property def packet_copy(self) -> bytes: """ Return copy of the whole packet. """ - if "_cache__packet_copy" not in self.__dict__: - self._cache__packet_copy = bytes(self._frame[: self.plen]) - return self._cache__packet_copy - @property - def pshdr_sum(self) -> int: - """ - Returns IPv6 pseudo header that is used by TCP, UDP and ICMPv6 - to compute their checksums. - """ - if "_cache__pshdr_sum" not in self.__dict__: - pseudo_header = struct.pack( - "! 16s 16s L BBBB", - bytes(self.src), - bytes(self.dst), - self.dlen, - 0, - 0, - 0, - self.next, - ) - self._cache__pshdr_sum = int( - sum(struct.unpack("! 5Q", pseudo_header)) - ) - - return self._cache__pshdr_sum + return bytes(self._frame[: self.plen]) - def _packet_integrity_check(self) -> None: + @override + def _validate_integrity(self) -> None: """ Packet integrity check to be run on raw packet prior to parsing to make sure parsing is safe. @@ -267,7 +154,30 @@ def _packet_integrity_check(self) -> None: "The wrong packet length (II)", ) - def _packet_sanity_check(self) -> None: + @override + def _parse(self) -> None: + """ + Parse IPv4 packet. + """ + + self._ver = self._frame[0] >> 4 + self._dscp = ((self._frame[0] & 0b00001111) << 2) | ( + (self._frame[1] & 0b11000000) >> 6 + ) + self._ecn = (self._frame[1] & 0b00110000) >> 4 + self._flow = ( + ((self._frame[1] & 0b00001111) << 16) + | (self._frame[2] << 8) + | self._frame[3] + ) + self._dlen = struct.unpack("!H", self._frame[4:6])[0] + self._next = Ip6Next.from_frame(self._frame) + self._hop = self._frame[7] + self._src = Ip6Address(self._frame[8:24]) + self._dst = Ip6Address(self._frame[24:40]) + + @override + def _validate_sanity(self) -> None: """ Packet sanity check to be run on parsed packet to make sure packet's fields contain sane values. diff --git a/pytcp/protocols/ip6/phrx.py b/pytcp/protocols/ip6/phrx.py index 9a16ba6f..3245269b 100755 --- a/pytcp/protocols/ip6/phrx.py +++ b/pytcp/protocols/ip6/phrx.py @@ -37,19 +37,14 @@ from __future__ import annotations -from abc import ABC, abstractproperty +from abc import ABC, abstractmethod from typing import TYPE_CHECKING from pytcp.lib.errors import PacketValidationError from pytcp.lib.logger import log from pytcp.lib.packet import PacketRx from pytcp.protocols.ip6.fpp import Ip6Parser -from pytcp.protocols.ip6.ps import ( - IP6_NEXT_EXT_FRAG, - IP6_NEXT_ICMP6, - IP6_NEXT_TCP, - IP6_NEXT_UDP, -) +from pytcp.protocols.ip6.ps import Ip6Next class PacketHandlerRxIp6(ABC): @@ -71,10 +66,12 @@ def _phrx_udp(self, *, packet_rx: PacketRx) -> None: ... def _phrx_tcp(self, *, packet_rx: PacketRx) -> None: ... - @abstractproperty + @property + @abstractmethod def ip6_unicast(self) -> list[Ip6Address]: ... - @abstractproperty + @property + @abstractmethod def ip6_multicast(self) -> list[Ip6Address]: ... def _phrx_ip6(self, *, packet_rx: PacketRx) -> None: @@ -85,7 +82,7 @@ def _phrx_ip6(self, *, packet_rx: PacketRx) -> None: self.packet_stats_rx.ip6__pre_parse += 1 try: - Ip6Parser(packet_rx) + Ip6Parser(packet_rx=packet_rx) except PacketValidationError as error: self.packet_stats_rx.ip6__failed_parse__drop += 1 __debug__ and log("ip6", f"{packet_rx.tracker} - {error}") @@ -110,25 +107,25 @@ def _phrx_ip6(self, *, packet_rx: PacketRx) -> None: if packet_rx.ip6.dst in self.ip6_multicast: self.packet_stats_rx.ip6__dst_multicast += 1 - if packet_rx.ip6.next == IP6_NEXT_EXT_FRAG: + if packet_rx.ip6.next == Ip6Next.FRAG: self._phrx_ip6_ext_frag( packet_rx=packet_rx, ) return - if packet_rx.ip6.next == IP6_NEXT_ICMP6: + if packet_rx.ip6.next == Ip6Next.ICMP6: self._phrx_icmp6( packet_rx=packet_rx, ) return - if packet_rx.ip6.next == IP6_NEXT_UDP: + if packet_rx.ip6.next == Ip6Next.UDP: self._phrx_udp( packet_rx=packet_rx, ) return - if packet_rx.ip6.next == IP6_NEXT_TCP: + if packet_rx.ip6.next == Ip6Next.TCP: self._phrx_tcp( packet_rx=packet_rx, ) diff --git a/pytcp/protocols/ip6/phtx.py b/pytcp/protocols/ip6/phtx.py index 57d5cfbe..1f38f403 100755 --- a/pytcp/protocols/ip6/phtx.py +++ b/pytcp/protocols/ip6/phtx.py @@ -135,9 +135,9 @@ def _phtx_ip6( # assemble IPv6 apcket ip6_packet_tx = Ip6Assembler( - src=ip6__src, - dst=ip6__dst, - hop=ip6__hop, + ip6__src=ip6__src, + ip6__dst=ip6__dst, + ip6__hop=ip6__hop, ip6__payload=ip6__payload, ) diff --git a/pytcp/protocols/ip6/ps.py b/pytcp/protocols/ip6/ps.py index a96a354e..9c41c0c4 100755 --- a/pytcp/protocols/ip6/ps.py +++ b/pytcp/protocols/ip6/ps.py @@ -35,7 +35,13 @@ from __future__ import annotations -from typing import TYPE_CHECKING, TypeAlias +import struct +from typing import TYPE_CHECKING, TypeAlias, override + +from pytcp.lib.enum import ProtoEnum +from pytcp.lib.ip6_address import Ip6Address +from pytcp.lib.proto import Proto +from pytcp.protocols.ethernet.ps import EthernetType if TYPE_CHECKING: from pytcp.protocols.icmp6.fpa import Icmp6Assembler @@ -79,18 +85,188 @@ IP6_HEADER_LEN = 40 -IP6_NEXT_TCP = 6 -IP6_NEXT_UDP = 17 -IP6_NEXT_EXT_FRAG = 44 -IP6_NEXT_ICMP6 = 58 -IP6_NEXT_RAW = 255 +IP6_MIN_MTU = 1280 # RFC 8200 -IP6_NEXT_TABLE = { - IP6_NEXT_TCP: "TCP", - IP6_NEXT_UDP: "UDP", - IP6_NEXT_EXT_FRAG: "FRAG", - IP6_NEXT_ICMP6: "ICMPv6", - IP6_NEXT_RAW: "raw_data", -} -IP6_MIN_MTU = 1280 # RFC 8200 +class Ip6Next(ProtoEnum): + """ + IPv6 protocol types. + """ + + TCP = 6 + UDP = 17 + FRAG = 44 + ICMP6 = 58 + RAW = 255 + + @staticmethod + def _extract(frame: bytes) -> int: + return frame[6] + + +class Ip6(Proto): + """ + Base class for IPv4 packet parser and assembler. + """ + + _ethernet_type: EthernetType = EthernetType.IP6 + + _ver: int + _dscp: int + _ecn: int + _flow: int + _dlen: int + _next: Ip6Next + _hop: int + _src: Ip6Address + _dst: Ip6Address + + @override + def __str__(self) -> str: + """ + Packet log string. + """ + + return ( + f"IPv6 {self.src} > {self.dst}, next {self.next}, " + f"flow {self.flow}, dlen {self.dlen}, hop {self.hop}" + ) + + @override + def __repr__(self) -> str: + """ + Get the packet representation string. + """ + + return ( + "Ip6(" + f"ver={self._ver!r}, " + f"dscp={self._dscp!r}, " + f"ecn={self._ecn!r}, " + f"flow={self._flow!r}, " + f"dlen={self._dlen!r}, " + f"next={self._next!r}, " + f"hop={self._hop!r}, " + f"src={self._src!r}, " + f"dst={self._dst!r})" + ) + + @override + def __bytes__(self) -> bytes: + """ + Get the packet in raw form. + """ + + return struct.pack( + "! BBBB HBB 16s 16s", + self._ver << 4 | self._dscp >> 4, + self._dscp & 0b00000011 << 6 + | self._ecn << 4 + | ((self._flow & 0b000011110000000000000000) >> 16), + (self._flow & 0b000000001111111100000000) >> 8, + self._flow & 0b000000000000000011111111, + self._dlen, + int(self._next), + self._hop, + bytes(self._src), + bytes(self._dst), + ) + + @property + def ethernet_type(self) -> EthernetType: + """ + Get the '_ethernet_type' attribute. + """ + + return self._ethernet_type + + @property + def ver(self) -> int: + """ + Get the '_ver' header field. + """ + + return self._ver + + @property + def dscp(self) -> int: + """ + Get the '_dscp' header field. + """ + + return self._dscp + + @property + def ecn(self) -> int: + """ + Get the '_ecn' header field. + """ + + return self._ecn + + @property + def flow(self) -> int: + """ + Get the '_flow' header field. + """ + + return self._flow + + @property + def dlen(self) -> int: + """ + Get the '_dlen' header field. + """ + + return self._dlen + + @property + def next(self) -> Ip6Next: + """ + Get the '_next' header field. + """ + + return self._next + + @property + def hop(self) -> int: + """ + Get the '_hop' header field. + """ + + return self._hop + + @property + def src(self) -> Ip6Address: + """ + Get the source address. + """ + + return self._src + + @property + def dst(self) -> Ip6Address: + """ + Get the destination address. + """ + + return self._dst + + @property + def pshdr_sum(self) -> int: + """ + Returns IPv6 pseudo header that is used by TCP, UDP and ICMPv6 + to compute their checksums. + """ + + pseudo_header = struct.pack( + "! 16s 16s L BBBB", + bytes(self._src), + bytes(self._dst), + self._dlen, + 0, + 0, + 0, + int(self._next), + ) + return sum(struct.unpack("! 5Q", pseudo_header)) diff --git a/pytcp/protocols/ip6_ext_frag/fpa.py b/pytcp/protocols/ip6_ext_frag/fpa.py index 77cf9475..13b4f1e5 100755 --- a/pytcp/protocols/ip6_ext_frag/fpa.py +++ b/pytcp/protocols/ip6_ext_frag/fpa.py @@ -41,17 +41,8 @@ import struct from pytcp.lib.tracker import Tracker -from pytcp.protocols.ip6.ps import ( - IP6_NEXT_EXT_FRAG, - IP6_NEXT_ICMP6, - IP6_NEXT_RAW, - IP6_NEXT_TCP, - IP6_NEXT_UDP, -) -from pytcp.protocols.ip6_ext_frag.ps import ( - IP6_EXT_FRAG_HEADER_LEN, - IP6_EXT_FRAG_NEXT_HEADER_TABLE, -) +from pytcp.protocols.ip6.ps import Ip6Next +from pytcp.protocols.ip6_ext_frag.ps import IP6_EXT_FRAG_HEADER_LEN class Ip6ExtFragAssembler: @@ -59,12 +50,12 @@ class Ip6ExtFragAssembler: IPv6 fragment extension header assembler support class. """ - ip6_next = IP6_NEXT_EXT_FRAG + ip6_next = Ip6Next.FRAG def __init__( self, *, - next: int, + next: Ip6Next, offset: int, flag_mf: bool, id: int, @@ -74,15 +65,10 @@ def __init__( Class constructor. """ - assert next in { - IP6_NEXT_ICMP6, - IP6_NEXT_UDP, - IP6_NEXT_TCP, - IP6_NEXT_RAW, - } + assert next in Ip6Next self._tracker: Tracker = Tracker(prefix="TX") - self._next: int = next + self._next: Ip6Next = next self._offset: int = offset self._flag_mf: bool = flag_mf self._id: int = id @@ -100,8 +86,7 @@ def __str__(self) -> str: return ( f"IPv6_FRAG id {self._id}{', MF' if self._flag_mf else ''}, " - f"offset {self._offset}, next {self._next} " - f"({IP6_EXT_FRAG_NEXT_HEADER_TABLE.get(self._next, '???')})" + f"offset {self._offset}, next {self._next}" ) @property diff --git a/pytcp/protocols/raw/fpa.py b/pytcp/protocols/raw/fpa.py index c4ff482d..07ce4864 100755 --- a/pytcp/protocols/raw/fpa.py +++ b/pytcp/protocols/raw/fpa.py @@ -39,9 +39,6 @@ from pytcp.lib.proto import ProtoAssembler from pytcp.lib.tracker import Tracker -from pytcp.protocols.ethernet.ps import EthernetType -from pytcp.protocols.ip4.ps import Ip4Proto -from pytcp.protocols.ip6.ps import IP6_NEXT_RAW from pytcp.protocols.raw.ps import Raw @@ -50,10 +47,6 @@ class RawAssembler(Raw, ProtoAssembler): Raw packet assembler support class. """ - ethernet_type = EthernetType.RAW - ip6_next = IP6_NEXT_RAW - ip4_proto = Ip4Proto.RAW - def __init__( self, *, data: bytes | None = None, echo_tracker: Tracker | None = None ) -> None: diff --git a/pytcp/protocols/raw/ps.py b/pytcp/protocols/raw/ps.py index 46f9123a..ebef1770 100755 --- a/pytcp/protocols/raw/ps.py +++ b/pytcp/protocols/raw/ps.py @@ -38,6 +38,9 @@ from typing import override from pytcp.lib.proto import Proto +from pytcp.protocols.ethernet.ps import EthernetType +from pytcp.protocols.ip4.ps import Ip4Proto +from pytcp.protocols.ip6.ps import Ip6Next class Raw(Proto): @@ -45,6 +48,10 @@ class Raw(Proto): Base class for raw packet parser and assembler. """ + _ethernet_type = EthernetType.RAW + _ip6_next = Ip6Next.RAW + _ip4_proto = Ip4Proto.RAW + _plen: int _data: bytes @@ -72,6 +79,30 @@ def __bytes__(self) -> bytes: return self._data + @property + def ethernet_type(self) -> EthernetType: + """ + Get the '_ethernet_type' attribute. + """ + + return self._ethernet_type + + @property + def ip6_next(self) -> Ip6Next: + """ + Get the '_ip6_next' attribute. + """ + + return self._ip6_next + + @property + def ip4_proto(self) -> Ip4Proto: + """ + Get the '_ip4_proto' attribute. + """ + + return self._ip4_proto + @property def plen(self) -> int: """ diff --git a/pytcp/protocols/tcp/fpa.py b/pytcp/protocols/tcp/fpa.py index 05dbbf90..cd61c894 100755 --- a/pytcp/protocols/tcp/fpa.py +++ b/pytcp/protocols/tcp/fpa.py @@ -42,8 +42,6 @@ from pytcp.lib.ip_helper import inet_cksum from pytcp.lib.proto import ProtoAssembler from pytcp.lib.tracker import Tracker -from pytcp.protocols.ip4.ps import Ip4Proto -from pytcp.protocols.ip6.ps import IP6_NEXT_TCP from pytcp.protocols.tcp.ps import ( TCP_HEADER_LEN, Tcp, @@ -61,9 +59,6 @@ class TcpAssembler(Tcp, ProtoAssembler): TCP packet assembler support class. """ - ip4_proto = Ip4Proto.TCP - ip6_next = IP6_NEXT_TCP - def __init__( self, *, diff --git a/pytcp/protocols/tcp/ps.py b/pytcp/protocols/tcp/ps.py index c408829b..47e183b2 100755 --- a/pytcp/protocols/tcp/ps.py +++ b/pytcp/protocols/tcp/ps.py @@ -39,6 +39,8 @@ from pytcp.lib.enum import ProtoEnum from pytcp.lib.proto import Proto +from pytcp.protocols.ip4.ps import Ip4Proto +from pytcp.protocols.ip6.ps import Ip6Next # TCP packet header (RFC 793) @@ -88,6 +90,9 @@ class Tcp(Proto): Base class for TCP packet parser and assembler. """ + _ip6_next = Ip6Next.TCP + _ip4_proto = Ip4Proto.TCP + _sport: int _dport: int _seq: int @@ -193,6 +198,22 @@ def __bytes__(self) -> bytes: self._data, ) + @property + def ip6_next(self) -> Ip6Next: + """ + Get the '_ip6_next' attribute. + """ + + return self._ip6_next + + @property + def ip4_proto(self) -> Ip4Proto: + """ + Get the '_ip4_proto' attribute. + """ + + return self._ip4_proto + @property def sport(self) -> int: """ diff --git a/pytcp/protocols/udp/fpa.py b/pytcp/protocols/udp/fpa.py index 328efbf3..5f2d99f0 100755 --- a/pytcp/protocols/udp/fpa.py +++ b/pytcp/protocols/udp/fpa.py @@ -40,8 +40,6 @@ from pytcp.lib.ip_helper import inet_cksum from pytcp.lib.proto import ProtoAssembler from pytcp.lib.tracker import Tracker -from pytcp.protocols.ip4.ps import Ip4Proto -from pytcp.protocols.ip6.ps import IP6_NEXT_UDP from pytcp.protocols.udp.ps import UDP_HEADER_LEN, Udp @@ -50,9 +48,6 @@ class UdpAssembler(Udp, ProtoAssembler): UDP packet assembler support class. """ - ip4_proto = Ip4Proto.UDP - ip6_next = IP6_NEXT_UDP - def __init__( self, *, diff --git a/pytcp/protocols/udp/ps.py b/pytcp/protocols/udp/ps.py index 41b58c51..b4673cdd 100755 --- a/pytcp/protocols/udp/ps.py +++ b/pytcp/protocols/udp/ps.py @@ -39,6 +39,8 @@ from typing import override from pytcp.lib.proto import Proto +from pytcp.protocols.ip4.ps import Ip4Proto +from pytcp.protocols.ip6.ps import Ip6Next # UDP packet header (RFC 768) @@ -57,6 +59,9 @@ class Udp(Proto): Base class for UDP packet parser and assembler. """ + _ip6_next = Ip6Next.UDP + _ip4_proto = Ip4Proto.UDP + _sport: int _dport: int _cksum: int @@ -106,6 +111,22 @@ def __bytes__(self) -> bytes: + self._data ) + @property + def ip6_next(self) -> Ip6Next: + """ + Get the '_ip6_next' attribute. + """ + + return self._ip6_next + + @property + def ip4_proto(self) -> Ip4Proto: + """ + Get the '_ip4_proto' attribute. + """ + + return self._ip4_proto + @property def sport(self) -> int: """