From c0ae50b49e791e3750749404a34007d829502220 Mon Sep 17 00:00:00 2001 From: Sebastian Majewski Date: Thu, 11 Jul 2024 21:35:52 -0500 Subject: [PATCH] Converted IPv6 to use header class --- pytcp/protocols/ip6/fpa.py | 24 ++++---- pytcp/protocols/ip6/fpp.py | 20 +------ pytcp/protocols/ip6/ps.py | 117 +++++++++++++++++++++++-------------- 3 files changed, 90 insertions(+), 71 deletions(-) diff --git a/pytcp/protocols/ip6/fpa.py b/pytcp/protocols/ip6/fpa.py index 178b8895..7fc8e81d 100755 --- a/pytcp/protocols/ip6/fpa.py +++ b/pytcp/protocols/ip6/fpa.py @@ -43,7 +43,7 @@ from pytcp.lib.ip6_address import Ip6Address from pytcp.lib.proto import ProtoAssembler from pytcp.protocols.ethernet.ps import EthernetType -from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, Ip6, Ip6Next +from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, Ip6, Ip6Header, Ip6Next from pytcp.protocols.raw.fpa import RawAssembler if TYPE_CHECKING: @@ -81,15 +81,19 @@ def __init__( self._payload = ip6__payload self._tracker: Tracker = self._payload.tracker - self._ver = 6 - self._dscp = ip6__dscp - self._ecn = ip6__ecn - self._flow = ip6__flow - self._hop = ip6__hop - self._src = ip6__src - self._dst = ip6__dst - self._next = self._payload.ip6_next - self._dlen = len(self._payload) + + self._header = Ip6Header( + ver=6, + dscp=ip6__dscp, + ecn=ip6__ecn, + flow=ip6__flow, + dlen=len(self._payload), + next=self._payload.ip6_next, + hop=ip6__hop, + src=ip6__src, + dst=ip6__dst, + ) + self._hlen = IP6_HEADER_LEN @override diff --git a/pytcp/protocols/ip6/fpp.py b/pytcp/protocols/ip6/fpp.py index c1b039dc..2b9cf227 100755 --- a/pytcp/protocols/ip6/fpp.py +++ b/pytcp/protocols/ip6/fpp.py @@ -41,9 +41,8 @@ from typing import TYPE_CHECKING, override from pytcp.lib.errors import PacketIntegrityError, PacketSanityError -from pytcp.lib.ip6_address import Ip6Address from pytcp.lib.proto import ProtoParser -from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, Ip6, Ip6Next +from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, Ip6, Ip6Header if TYPE_CHECKING: from pytcp.lib.packet import PacketRx @@ -160,21 +159,8 @@ def _parse(self) -> None: Parse IPv6 packet. """ - self._ver = self._frame[0] >> 4 - self._dscp = ((self._frame[0] & 0b00001111) << 2) | ( - (self._frame[1] & 0b11000000) >> 6 - ) - self._ecn = (self._frame[1] & 0b00110000) >> 4 - self._flow = ( - ((self._frame[1] & 0b00001111) << 16) - | (self._frame[2] << 8) - | self._frame[3] - ) - self._dlen = struct.unpack("!H", self._frame[4:6])[0] - self._next = Ip6Next.from_frame(self._frame) - self._hop = self._frame[7] - self._src = Ip6Address(self._frame[8:24]) - self._dst = Ip6Address(self._frame[24:40]) + self._header = Ip6Header.from_frame(self._frame) + self._dlen = self._header.dlen @override def _validate_sanity(self) -> None: diff --git a/pytcp/protocols/ip6/ps.py b/pytcp/protocols/ip6/ps.py index 0aa4ffd5..6e08c893 100755 --- a/pytcp/protocols/ip6/ps.py +++ b/pytcp/protocols/ip6/ps.py @@ -36,6 +36,7 @@ from __future__ import annotations import struct +from dataclasses import dataclass from typing import TYPE_CHECKING, TypeAlias, override from pytcp.lib.enum import ProtoEnum @@ -104,6 +105,42 @@ def _extract(frame: bytes) -> int: return frame[6] +@dataclass +class Ip6Header: + """ + Data class representing the IPv6 header. + """ + + ver: int + dscp: int + ecn: int + flow: int + dlen: int + next: Ip6Next + hop: int + src: Ip6Address + dst: Ip6Address + + @staticmethod + def from_frame(frame: bytes) -> Ip6Header: + """ + Populate the header from the raw frame. + """ + + return Ip6Header( + ver=frame[0] >> 4, + dscp=((frame[0] & 0b00001111) << 2) + | ((frame[1] & 0b11000000) >> 6), + ecn=(frame[1] & 0b00110000) >> 4, + flow=((frame[1] & 0b00001111) << 16) | (frame[2] << 8) | frame[3], + dlen=struct.unpack("!H", frame[4:6])[0], + next=Ip6Next.from_frame(frame), + hop=frame[7], + src=Ip6Address(frame[8:24]), + dst=Ip6Address(frame[24:40]), + ) + + class Ip6(Proto): """ Base class for IPv4 packet parser and assembler. @@ -111,15 +148,7 @@ class Ip6(Proto): _ethernet_type: EthernetType = EthernetType.IP6 - _ver: int - _dscp: int - _ecn: int - _flow: int - _dlen: int - _next: Ip6Next - _hop: int - _src: Ip6Address - _dst: Ip6Address + _header: Ip6Header @override def __str__(self) -> str: @@ -128,8 +157,8 @@ def __str__(self) -> str: """ return ( - f"IPv6 {self.src} > {self.dst}, next {self.next}, " - f"flow {self.flow}, dlen {self.dlen}, hop {self.hop}" + f"IPv6 {self._header.src} > {self._header.dst}, next {self._header.next}, " + f"flow {self._header.flow}, plen {self._header.dlen}, hop {self._header.hop}" ) @override @@ -140,15 +169,15 @@ def __repr__(self) -> str: return ( "Ip6(" - f"ver={self._ver!r}, " - f"dscp={self._dscp!r}, " - f"ecn={self._ecn!r}, " - f"flow={self._flow!r}, " - f"dlen={self._dlen!r}, " - f"next={self._next!r}, " - f"hop={self._hop!r}, " - f"src={self._src!r}, " - f"dst={self._dst!r})" + f"ver={self._header.ver!r}, " + f"dscp={self._header.dscp!r}, " + f"ecn={self._header.ecn!r}, " + f"flow={self._header.flow!r}, " + f"dlen={self._header.dlen!r}, " + f"next={self._header.next!r}, " + f"hop={self._header.hop!r}, " + f"src={self._header.src!r}, " + f"dst={self._header.dst!r})" ) @override @@ -159,17 +188,17 @@ def __bytes__(self) -> bytes: return struct.pack( "! BBBB HBB 16s 16s", - self._ver << 4 | self._dscp >> 4, - self._dscp & 0b00000011 << 6 - | self._ecn << 4 - | ((self._flow & 0b000011110000000000000000) >> 16), - (self._flow & 0b000000001111111100000000) >> 8, - self._flow & 0b000000000000000011111111, - self._dlen, - int(self._next), - self._hop, - bytes(self._src), - bytes(self._dst), + self._header.ver << 4 | self._header.dscp >> 4, + self._header.dscp & 0b00000011 << 6 + | self._header.ecn << 4 + | ((self._header.flow & 0b000011110000000000000000) >> 16), + (self._header.flow & 0b000000001111111100000000) >> 8, + self._header.flow & 0b000000000000000011111111, + self._header.dlen, + int(self._header.next), + self._header.hop, + bytes(self._header.src), + bytes(self._header.dst), ) @property @@ -186,7 +215,7 @@ def ver(self) -> int: Get the '_ver' header field. """ - return self._ver + return self._header.ver @property def dscp(self) -> int: @@ -194,7 +223,7 @@ def dscp(self) -> int: Get the '_dscp' header field. """ - return self._dscp + return self._header.dscp @property def ecn(self) -> int: @@ -202,7 +231,7 @@ def ecn(self) -> int: Get the '_ecn' header field. """ - return self._ecn + return self._header.ecn @property def flow(self) -> int: @@ -210,7 +239,7 @@ def flow(self) -> int: Get the '_flow' header field. """ - return self._flow + return self._header.flow @property def dlen(self) -> int: @@ -218,7 +247,7 @@ def dlen(self) -> int: Get the '_dlen' header field. """ - return self._dlen + return self._header.dlen @property def next(self) -> Ip6Next: @@ -226,7 +255,7 @@ def next(self) -> Ip6Next: Get the '_next' header field. """ - return self._next + return self._header.next @property def hop(self) -> int: @@ -234,7 +263,7 @@ def hop(self) -> int: Get the '_hop' header field. """ - return self._hop + return self._header.hop @property def src(self) -> Ip6Address: @@ -242,7 +271,7 @@ def src(self) -> Ip6Address: Get the source address. """ - return self._src + return self._header.src @property def dst(self) -> Ip6Address: @@ -250,7 +279,7 @@ def dst(self) -> Ip6Address: Get the destination address. """ - return self._dst + return self._header.dst @property def pshdr_sum(self) -> int: @@ -261,12 +290,12 @@ def pshdr_sum(self) -> int: pseudo_header = struct.pack( "! 16s 16s L BBBB", - bytes(self._src), - bytes(self._dst), - self._dlen, + bytes(self._header.src), + bytes(self._header.dst), + self._header.dlen, 0, 0, 0, - int(self._next), + int(self._header.next), ) return sum(struct.unpack("! 5Q", pseudo_header))