Skip to content

Commit

Permalink
Cleanup - all working but Ipv6 fragmentation
Browse files Browse the repository at this point in the history
  • Loading branch information
ccie18643 committed Jul 7, 2024
1 parent 10de61a commit 35ab313
Show file tree
Hide file tree
Showing 18 changed files with 391 additions and 328 deletions.
4 changes: 1 addition & 3 deletions pytcp/protocols/icmp6/fpa.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
Icmp6NdRouterSolicitationMessage,
Icmp6PortUnreachableMessage,
)
from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, IP6_NEXT_ICMP6
from pytcp.protocols.ip6.ps import IP6_HEADER_LEN

if TYPE_CHECKING:
from pytcp.lib.mac_address import MacAddress
Expand All @@ -80,8 +80,6 @@ class Icmp6Assembler(Icmp6, ProtoAssembler):
ICMPv6 packet assembler support class.
"""

ip6_next = IP6_NEXT_ICMP6

def __init__(
self,
*,
Expand Down
5 changes: 3 additions & 2 deletions pytcp/protocols/icmp6/phrx.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
Icmp6NdRouterSolicitationMessage,
Icmp6PortUnreachableMessage,
)
from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, IP6_NEXT_UDP
from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, Ip6Next
from pytcp.protocols.udp.metadata import UdpMetadata
from pytcp.protocols.udp.ps import UDP_HEADER_LEN

Expand All @@ -82,6 +82,7 @@ class PacketHandlerRxIcmp6(ABC):
from pytcp.lib.tracker import Tracker
from pytcp.lib.tx_status import TxStatus
from pytcp.protocols.icmp6.ps import Icmp6Message
from pytcp.protocols.ip6.ps import Ip6Next

packet_stats_rx: PacketStatsRx
icmp6_ra_event: Semaphore
Expand Down Expand Up @@ -154,7 +155,7 @@ def __phrx_icmp6__port_unreachable(self, *, packet_rx: PacketRx) -> None:
if (
len(frame) >= IP6_HEADER_LEN + UDP_HEADER_LEN
and frame[0] >> 4 == 6
and frame[6] == IP6_NEXT_UDP
and frame[6] == int(Ip6Next.UDP)
):
# Create UdpMetadata object and try to find matching UDP socket
udp_offset = IP6_HEADER_LEN
Expand Down
12 changes: 11 additions & 1 deletion pytcp/protocols/icmp6/ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from pytcp.lib.ip6_address import Ip6Address, Ip6Network
from pytcp.lib.mac_address import MacAddress
from pytcp.lib.proto import Proto
from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, IP6_MIN_MTU
from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, IP6_MIN_MTU, Ip6Next

# Destination Unreachable message (1/[0-6])

Expand Down Expand Up @@ -471,6 +471,8 @@ class Icmp6(Proto):
Base class for ICMPv6 packet parser and assembler classes.
"""

_ip6_next = Ip6Next.ICMP6

_message: Icmp6Message

@override
Expand All @@ -497,6 +499,14 @@ def __bytes__(self) -> bytes:

return bytes(self._message)

@property
def ip6_next(self) -> Ip6Next:
"""
Get the '_ip6_next' attribute.
"""

return self._ip6_next

@property
def type(self) -> Icmp6Type:
"""
Expand Down
6 changes: 3 additions & 3 deletions pytcp/protocols/ip4/fpa.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from __future__ import annotations

import struct
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, override

from pytcp import config
from pytcp.lib.ip4_address import Ip4Address
Expand Down Expand Up @@ -88,6 +88,7 @@ def __init__(
assert 0 <= ip4__dscp <= 0x3F
assert 0 <= ip4__ecn <= 0x03
assert 0 <= ip4__id <= 0xFFFF
assert ip4__payload.ip4_proto in Ip4Proto

self._payload = ip4__payload
self._tracker = self._payload.tracker
Expand All @@ -110,6 +111,7 @@ def __init__(
self._dlen = len(self._payload)
self._plen = self._hlen + self._dlen

@override
def __len__(self) -> int:
"""
Get length of the packet.
Expand Down Expand Up @@ -144,8 +146,6 @@ class Ip4FragAssembler(Ip4):
IPv4 packet fragment assembler support class.
"""

ethernet_type = EthernetType.IP4

def __init__(
self,
*,
Expand Down
2 changes: 1 addition & 1 deletion pytcp/protocols/ip4/phrx.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def __defragment_ip4_packet(
f"{len(self.ip4_frag_flows[flow_id]['data'][offset])}s",
data,
offset,
self.ip4_frag_flows[flow_id]["data"][offset],
bytes(self.ip4_frag_flows[flow_id]["data"][offset]),
)
del self.ip4_frag_flows[flow_id]
header[0] = 0x45
Expand Down
11 changes: 11 additions & 0 deletions pytcp/protocols/ip4/ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from pytcp.lib.enum import ProtoEnum
from pytcp.lib.ip4_address import Ip4Address
from pytcp.lib.proto import Proto
from pytcp.protocols.ethernet.ps import EthernetType

if TYPE_CHECKING:
from pytcp.protocols.icmp4.fpa import Icmp4Assembler
Expand Down Expand Up @@ -104,6 +105,8 @@ class Ip4(Proto):
Base class for IPv4 packet parser and assembler.
"""

_ethernet_type = EthernetType.IP4

_ver: int
_hlen: int
_olen: int
Expand Down Expand Up @@ -189,6 +192,14 @@ def __bytes__(self) -> bytes:
raw_options,
)

@property
def ethernet_type(self) -> EthernetType:
"""
Get the '_ethernet_type' attribute.
"""

return self._ethernet_type

@property
def ver(self) -> int:
"""
Expand Down
141 changes: 30 additions & 111 deletions pytcp/protocols/ip6/fpa.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,21 @@
from __future__ import annotations

import struct
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, override

from pytcp import config
from pytcp.lib.ip6_address import Ip6Address
from pytcp.lib.proto import ProtoAssembler
from pytcp.protocols.ethernet.ps import EthernetType
from pytcp.protocols.ip6.ps import (
IP6_HEADER_LEN,
IP6_NEXT_EXT_FRAG,
IP6_NEXT_ICMP6,
IP6_NEXT_RAW,
IP6_NEXT_TABLE,
IP6_NEXT_TCP,
IP6_NEXT_UDP,
)
from pytcp.protocols.ip6.ps import IP6_HEADER_LEN, Ip6, Ip6Next
from pytcp.protocols.raw.fpa import RawAssembler

if TYPE_CHECKING:
from pytcp.lib.tracker import Tracker
from pytcp.protocols.ip6.ps import Ip6Payload


class Ip6Assembler:
class Ip6Assembler(Ip6, ProtoAssembler):
"""
IPv6 packet assembler support class.
"""
Expand All @@ -68,133 +61,59 @@ class Ip6Assembler:
def __init__(
self,
*,
src: Ip6Address = Ip6Address(0),
dst: Ip6Address = Ip6Address(0),
hop: int = config.IP6_DEFAULT_HOP,
dscp: int = 0,
ecn: int = 0,
flow: int = 0,
ip6__src: Ip6Address = Ip6Address(0),
ip6__dst: Ip6Address = Ip6Address(0),
ip6__hop: int = config.IP6_DEFAULT_HOP,
ip6__dscp: int = 0,
ip6__ecn: int = 0,
ip6__flow: int = 0,
ip6__payload: Ip6Payload = RawAssembler(),
) -> None:
"""
Class constructor.
"""

assert 0 <= hop <= 0xFF
assert 0 <= dscp <= 0x3F
assert 0 <= ecn <= 0x03
assert 0 <= flow <= 0xFFFFFF
assert ip6__payload.ip6_next in {
IP6_NEXT_ICMP6,
IP6_NEXT_UDP,
IP6_NEXT_TCP,
IP6_NEXT_EXT_FRAG,
IP6_NEXT_RAW,
}
assert 0 <= ip6__hop <= 0xFF
assert 0 <= ip6__dscp <= 0x3F
assert 0 <= ip6__ecn <= 0x03
assert 0 <= ip6__flow <= 0xFFFFFF
assert ip6__payload.ip6_next in Ip6Next

self._payload = ip6__payload
self._tracker: Tracker = self._payload.tracker
self._ver: int = 6
self._dscp: int = dscp
self._ecn: int = ecn
self._flow: int = flow
self._hop: int = hop
self._src: Ip6Address = src
self._dst: Ip6Address = dst
self._next: int = self._payload.ip6_next
self._dlen: int = len(self._payload)

self._ver = 6
self._dscp = ip6__dscp
self._ecn = ip6__ecn
self._flow = ip6__flow
self._hop = ip6__hop
self._src = ip6__src
self._dst = ip6__dst
self._next = self._payload.ip6_next
self._dlen = len(self._payload)
self._hlen = IP6_HEADER_LEN

@override
def __len__(self) -> int:
"""
Length of the packet.
"""

return IP6_HEADER_LEN + len(self._payload)

def __str__(self) -> str:
"""
Packet log string.
"""

return (
f"IPv6 {self._src} > {self._dst}, next {self._next} "
f"({IP6_NEXT_TABLE.get(self._next, '???')}), flow {self._flow}, "
f"dlen {self._dlen}, hop {self._hop}"
)

@property
def tracker(self) -> Tracker:
"""
Getter for the '_tracker' attribute."""
return self._tracker

@property
def dst(self) -> Ip6Address:
"""
Getter for the '_dst' attribute.
"""
return self._dst

@property
def src(self) -> Ip6Address:
"""
Getter for the '_src' attribute.
Getter for the '_tracker' attribute.
"""
return self._src

@property
def dlen(self) -> int:
"""
Getter for the '_dlen' attribute.
"""
return self._dlen

@property
def next(self) -> int:
"""
Getter for the '_next' attribute.
"""
return self._next

@property
def pshdr_sum(self) -> int:
"""
Returns IPv6 pseudo header that is used by TCP, UDP and ICMPv6
to compute their checksums.
"""
pseudo_header = struct.pack(
"! 16s 16s L BBBB",
bytes(self._src),
bytes(self._dst),
self._dlen,
0,
0,
0,
self._next,
)
return sum(struct.unpack("! 5Q", pseudo_header))
return self._tracker

def assemble(self, *, frame: memoryview) -> None:
"""
Assemble packet into the frame.
"""

struct.pack_into(
"! BBBB HBB 16s 16s",
frame,
0,
self._ver << 4 | self._dscp >> 4,
self._dscp & 0b00000011 << 6
| self._ecn << 4
| ((self._flow & 0b000011110000000000000000) >> 16),
(self._flow & 0b000000001111111100000000) >> 8,
self._flow & 0b000000000000000011111111,
self._dlen,
self._next,
self._hop,
bytes(self._src),
bytes(self._dst),
)
struct.pack_into(f"{self._hlen}s", frame, 0, bytes(self))

self._payload.assemble(
frame=frame[IP6_HEADER_LEN:],
Expand Down
Loading

0 comments on commit 35ab313

Please sign in to comment.