Skip to content

Commit

Permalink
Converted IPv6 to use header class
Browse files Browse the repository at this point in the history
  • Loading branch information
ccie18643 committed Jul 12, 2024
1 parent 238c24f commit c0ae50b
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 71 deletions.
24 changes: 14 additions & 10 deletions pytcp/protocols/ip6/fpa.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from pytcp.lib.ip6_address import Ip6Address
from pytcp.lib.proto import ProtoAssembler
from pytcp.protocols.ethernet.ps import EthernetType
from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, Ip6, Ip6Next
from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, Ip6, Ip6Header, Ip6Next
from pytcp.protocols.raw.fpa import RawAssembler

if TYPE_CHECKING:
Expand Down Expand Up @@ -81,15 +81,19 @@ def __init__(

self._payload = ip6__payload
self._tracker: Tracker = self._payload.tracker
self._ver = 6
self._dscp = ip6__dscp
self._ecn = ip6__ecn
self._flow = ip6__flow
self._hop = ip6__hop
self._src = ip6__src
self._dst = ip6__dst
self._next = self._payload.ip6_next
self._dlen = len(self._payload)

self._header = Ip6Header(
ver=6,
dscp=ip6__dscp,
ecn=ip6__ecn,
flow=ip6__flow,
dlen=len(self._payload),
next=self._payload.ip6_next,
hop=ip6__hop,
src=ip6__src,
dst=ip6__dst,
)

self._hlen = IP6_HEADER_LEN

@override
Expand Down
20 changes: 3 additions & 17 deletions pytcp/protocols/ip6/fpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@
from typing import TYPE_CHECKING, override

from pytcp.lib.errors import PacketIntegrityError, PacketSanityError
from pytcp.lib.ip6_address import Ip6Address
from pytcp.lib.proto import ProtoParser
from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, Ip6, Ip6Next
from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, Ip6, Ip6Header

if TYPE_CHECKING:
from pytcp.lib.packet import PacketRx
Expand Down Expand Up @@ -160,21 +159,8 @@ def _parse(self) -> None:
Parse IPv6 packet.
"""

self._ver = self._frame[0] >> 4
self._dscp = ((self._frame[0] & 0b00001111) << 2) | (
(self._frame[1] & 0b11000000) >> 6
)
self._ecn = (self._frame[1] & 0b00110000) >> 4
self._flow = (
((self._frame[1] & 0b00001111) << 16)
| (self._frame[2] << 8)
| self._frame[3]
)
self._dlen = struct.unpack("!H", self._frame[4:6])[0]
self._next = Ip6Next.from_frame(self._frame)
self._hop = self._frame[7]
self._src = Ip6Address(self._frame[8:24])
self._dst = Ip6Address(self._frame[24:40])
self._header = Ip6Header.from_frame(self._frame)
self._dlen = self._header.dlen

@override
def _validate_sanity(self) -> None:
Expand Down
117 changes: 73 additions & 44 deletions pytcp/protocols/ip6/ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from __future__ import annotations

import struct
from dataclasses import dataclass
from typing import TYPE_CHECKING, TypeAlias, override

from pytcp.lib.enum import ProtoEnum
Expand Down Expand Up @@ -104,22 +105,50 @@ def _extract(frame: bytes) -> int:
return frame[6]


@dataclass
class Ip6Header:
"""
Data class representing the IPv6 header.
"""

ver: int
dscp: int
ecn: int
flow: int
dlen: int
next: Ip6Next
hop: int
src: Ip6Address
dst: Ip6Address

@staticmethod
def from_frame(frame: bytes) -> Ip6Header:
"""
Populate the header from the raw frame.
"""

return Ip6Header(
ver=frame[0] >> 4,
dscp=((frame[0] & 0b00001111) << 2)
| ((frame[1] & 0b11000000) >> 6),
ecn=(frame[1] & 0b00110000) >> 4,
flow=((frame[1] & 0b00001111) << 16) | (frame[2] << 8) | frame[3],
dlen=struct.unpack("!H", frame[4:6])[0],
next=Ip6Next.from_frame(frame),
hop=frame[7],
src=Ip6Address(frame[8:24]),
dst=Ip6Address(frame[24:40]),
)


class Ip6(Proto):
"""
Base class for IPv4 packet parser and assembler.
"""

_ethernet_type: EthernetType = EthernetType.IP6

_ver: int
_dscp: int
_ecn: int
_flow: int
_dlen: int
_next: Ip6Next
_hop: int
_src: Ip6Address
_dst: Ip6Address
_header: Ip6Header

@override
def __str__(self) -> str:
Expand All @@ -128,8 +157,8 @@ def __str__(self) -> str:
"""

return (
f"IPv6 {self.src} > {self.dst}, next {self.next}, "
f"flow {self.flow}, dlen {self.dlen}, hop {self.hop}"
f"IPv6 {self._header.src} > {self._header.dst}, next {self._header.next}, "
f"flow {self._header.flow}, plen {self._header.dlen}, hop {self._header.hop}"
)

@override
Expand All @@ -140,15 +169,15 @@ def __repr__(self) -> str:

return (
"Ip6("
f"ver={self._ver!r}, "
f"dscp={self._dscp!r}, "
f"ecn={self._ecn!r}, "
f"flow={self._flow!r}, "
f"dlen={self._dlen!r}, "
f"next={self._next!r}, "
f"hop={self._hop!r}, "
f"src={self._src!r}, "
f"dst={self._dst!r})"
f"ver={self._header.ver!r}, "
f"dscp={self._header.dscp!r}, "
f"ecn={self._header.ecn!r}, "
f"flow={self._header.flow!r}, "
f"dlen={self._header.dlen!r}, "
f"next={self._header.next!r}, "
f"hop={self._header.hop!r}, "
f"src={self._header.src!r}, "
f"dst={self._header.dst!r})"
)

@override
Expand All @@ -159,17 +188,17 @@ def __bytes__(self) -> bytes:

return struct.pack(
"! BBBB HBB 16s 16s",
self._ver << 4 | self._dscp >> 4,
self._dscp & 0b00000011 << 6
| self._ecn << 4
| ((self._flow & 0b000011110000000000000000) >> 16),
(self._flow & 0b000000001111111100000000) >> 8,
self._flow & 0b000000000000000011111111,
self._dlen,
int(self._next),
self._hop,
bytes(self._src),
bytes(self._dst),
self._header.ver << 4 | self._header.dscp >> 4,
self._header.dscp & 0b00000011 << 6
| self._header.ecn << 4
| ((self._header.flow & 0b000011110000000000000000) >> 16),
(self._header.flow & 0b000000001111111100000000) >> 8,
self._header.flow & 0b000000000000000011111111,
self._header.dlen,
int(self._header.next),
self._header.hop,
bytes(self._header.src),
bytes(self._header.dst),
)

@property
Expand All @@ -186,71 +215,71 @@ def ver(self) -> int:
Get the '_ver' header field.
"""

return self._ver
return self._header.ver

@property
def dscp(self) -> int:
"""
Get the '_dscp' header field.
"""

return self._dscp
return self._header.dscp

@property
def ecn(self) -> int:
"""
Get the '_ecn' header field.
"""

return self._ecn
return self._header.ecn

@property
def flow(self) -> int:
"""
Get the '_flow' header field.
"""

return self._flow
return self._header.flow

@property
def dlen(self) -> int:
"""
Get the '_dlen' header field.
"""

return self._dlen
return self._header.dlen

@property
def next(self) -> Ip6Next:
"""
Get the '_next' header field.
"""

return self._next
return self._header.next

@property
def hop(self) -> int:
"""
Get the '_hop' header field.
"""

return self._hop
return self._header.hop

@property
def src(self) -> Ip6Address:
"""
Get the source address.
"""

return self._src
return self._header.src

@property
def dst(self) -> Ip6Address:
"""
Get the destination address.
"""

return self._dst
return self._header.dst

@property
def pshdr_sum(self) -> int:
Expand All @@ -261,12 +290,12 @@ def pshdr_sum(self) -> int:

pseudo_header = struct.pack(
"! 16s 16s L BBBB",
bytes(self._src),
bytes(self._dst),
self._dlen,
bytes(self._header.src),
bytes(self._header.dst),
self._header.dlen,
0,
0,
0,
int(self._next),
int(self._header.next),
)
return sum(struct.unpack("! 5Q", pseudo_header))

0 comments on commit c0ae50b

Please sign in to comment.