Skip to content

Commit

Permalink
Refectored examples code
Browse files Browse the repository at this point in the history
  • Loading branch information
ccie18643 committed Sep 8, 2024
1 parent 2e7aea6 commit 7486798
Show file tree
Hide file tree
Showing 18 changed files with 1,222 additions and 335 deletions.
119 changes: 90 additions & 29 deletions examples/icmp_echo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
## ##
################################################################################

# pylint: disable = redefined-outer-name

"""
The example 'user space' client for ICMPv4/v6 Echo.
Expand All @@ -45,8 +44,19 @@

from pytcp import TcpIpStack, initialize_interface
from pytcp.lib import stack
from pytcp.lib.ip_helper import str_to_ip
from pytcp.lib.net_addr import Ip4Address, Ip6Address
from pytcp.lib.net_addr.click_types import (
ClickTypeIp4Address,
ClickTypeIp4Host,
ClickTypeIp6Address,
ClickTypeIp6Host,
ClickTypeIpAddress,
ClickTypeMacAddress,
)
from pytcp.lib.net_addr.ip4_host import Ip4Host
from pytcp.lib.net_addr.ip6_host import Ip6Host
from pytcp.lib.net_addr.ip_address import IpAddress
from pytcp.lib.net_addr.mac_address import MacAddress
from pytcp.protocols.icmp4.message.icmp4_message__echo_request import (
Icmp4EchoRequestMessage,
)
Expand All @@ -63,16 +73,16 @@ class IcmpEchoClient:
def __init__(
self,
*,
local_ip_address: str = "0.0.0.0",
remote_ip_address: str,
local_ip_address: IpAddress,
remote_ip_address: IpAddress,
message_count: int = -1,
) -> None:
"""
Class constructor.
"""

self._local_ip_address = str_to_ip(local_ip_address)
self._remote_ip_address = str_to_ip(remote_ip_address)
self._local_ip_address = local_ip_address
self._remote_ip_address = remote_ip_address
self._message_count = message_count
self._run_thread = False

Expand All @@ -83,7 +93,7 @@ def start(self) -> None:

click.echo("Starting the ICMP Echo client.")
self._run_thread = True
threading.Thread(target=self.__thread_client).start()
threading.Thread(target=self.__thread__client).start()
time.sleep(0.1)

def stop(self) -> None:
Expand All @@ -95,7 +105,7 @@ def stop(self) -> None:
self._run_thread = False
time.sleep(0.1)

def __thread_client(self) -> None:
def __thread__client(self) -> None:
assert self._local_ip_address is not None

flow_id = random.randint(0, 65535)
Expand Down Expand Up @@ -142,43 +152,94 @@ def __thread_client(self) -> None:


@click.command()
@click.option("--interface", default="tap7")
@click.option("--mac-address", default=None)
@click.option("--ip6-address", default=None)
@click.option("--ip6-gateway", default=None)
@click.option("--ip4-address", default=None)
@click.option("--ip4-gateway", default=None)
@click.option("--remote-ip-address")
@click.option(
"--interface",
default="tap7",
help="Name of the interface to be used by the stack.",
)
@click.option(
"--mac-address",
type=ClickTypeMacAddress(),
default=None,
help="MAC address to be assigned to the interface.",
)
@click.option(
"--ip6-address",
type=ClickTypeIp6Host(),
default=None,
help="IPv6 address/mask to be assigned to the interface.",
)
@click.option(
"--ip6-gateway",
type=ClickTypeIp6Address(),
default=None,
help="IPv6 gateway address to be assigned to the interface.",
)
@click.option(
"--ip4-address",
type=ClickTypeIp4Host(),
default=None,
help="IPv4 address/mask to be assigned to the interface.",
)
@click.option(
"--ip4-gateway",
type=ClickTypeIp4Address(),
default=None,
help="IPv4 gateway address to be assigned to the interface.",
)
@click.option(
"--remote-ip-address",
type=ClickTypeIpAddress(),
required=True,
help="Remote IP address to be pinged.",
)
def cli(
*,
interface: str,
mac_address: str,
ip6_address: str,
ip6_gateway: str,
ip4_address: str,
ip4_gateway: str,
remote_ip_address: str,
mac_address: MacAddress | None,
ip6_address: Ip6Host | None,
ip6_gateway: Ip6Address | None,
ip4_address: Ip4Host | None,
ip4_gateway: Ip4Address | None,
remote_ip_address: IpAddress,
) -> None:
"""
Start PyTCP stack and stop it when user presses Ctrl-C.
Run the ICMP Echo client.
Start Icmp Echo client.
"""

fd, mtu = initialize_interface(interface)

ip6_host = (
None
if ip6_address is None
else Ip6Host(ip6_address, gateway=ip6_gateway)
)
ip4_host = (
None
if ip4_address is None
else Ip4Host(ip4_address, gateway=ip4_gateway)
)

stack = TcpIpStack(
fd=fd,
mtu=mtu,
mac_address=mac_address,
ip6_address=ip6_address,
ip6_gateway=ip6_gateway,
ip4_address=ip4_address,
ip4_gateway=ip4_gateway,
ip6_host=ip6_host,
ip4_host=ip4_host,
)

client = IcmpEchoClient(
remote_ip_address=remote_ip_address,
)
match remote_ip_address.version:
case 6:
client = IcmpEchoClient(
local_ip_address=ip6_host.address if ip6_host else Ip6Address(),
remote_ip_address=remote_ip_address,
)
case 4:
client = IcmpEchoClient(
local_ip_address=ip4_host.address if ip4_host else Ip4Address(),
remote_ip_address=remote_ip_address,
)

try:
stack.start()
Expand Down
49 changes: 22 additions & 27 deletions examples/lib/tcp_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import click

from pytcp.lib import socket
from pytcp.lib.ip_helper import ip_version
from pytcp.lib.net_addr.ip_address import IpAddress

if TYPE_CHECKING:
from pytcp.lib.socket import Socket
Expand All @@ -54,18 +54,14 @@ class TcpService:
"""

def __init__(
self, *, service_name: str, local_ip_address: str, local_port: int
self, *, service_name: str, local_ip_address: IpAddress, local_port: int
) -> None:
"""
Class constructor.
"""

self._service_name = service_name
self._local_ip_address = (
local_ip_address.split("/")[0]
if "/" in local_ip_address
else local_ip_address
)
self._local_ip_address = local_ip_address
self._local_port = local_port
self._run_thread = False

Expand All @@ -76,7 +72,7 @@ def start(self) -> None:

click.echo(f"Starting the TCP {self._service_name} service.")
self._run_thread = True
threading.Thread(target=self.__thread_service).start()
threading.Thread(target=self.__thread__service).start()
time.sleep(0.1)

def stop(self) -> None:
Expand All @@ -88,33 +84,30 @@ def stop(self) -> None:
self._run_thread = False
time.sleep(0.1)

def __thread_service(self) -> None:
def __thread__service(self) -> None:
"""
Service initialization.
"""

version = ip_version(self._local_ip_address)
if version == 6:
listening_socket = socket.socket(
family=socket.AF_INET6, type=socket.SOCK_STREAM
)
elif version == 4:
listening_socket = socket.socket(
family=socket.AF_INET4, type=socket.SOCK_STREAM
)
else:
click.echo(
f"Service TCP {self._service_name}: Invalid local IP address - "
f"{self._local_ip_address}."
)
return
match self._local_ip_address.version:
case 6:
listening_socket = socket.socket(
family=socket.AF_INET6, type=socket.SOCK_STREAM
)
case 4:
listening_socket = socket.socket(
family=socket.AF_INET4, type=socket.SOCK_STREAM
)

try:
listening_socket.bind((self._local_ip_address, self._local_port))
listening_socket.bind(
(str(self._local_ip_address), self._local_port)
)
click.echo(
f"Service TCP {self._service_name}: Socket created, bound to "
f"{self._local_ip_address}, port {self._local_port}."
)

except OSError as error:
click.echo(
f"Service TCP {self._service_name}: bind() call failed - {error!r}."
Expand All @@ -133,11 +126,13 @@ def __thread_service(self) -> None:
f"{connected_socket.remote_ip_address}, port {connected_socket.remote_port}."
)
threading.Thread(
target=self.__thread_connection,
target=self.__thread__service__connection_handler,
kwargs={"connected_socket": connected_socket},
).start()

def __thread_connection(self, *, connected_socket: Socket) -> None:
def __thread__service__connection_handler(
self, *, connected_socket: Socket
) -> None:
"""
Inbound connection handler.
"""
Expand Down
48 changes: 18 additions & 30 deletions examples/lib/udp_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@

import click

import pytcp.lib.socket as sock
from pytcp.lib.ip_helper import ip_version
from pytcp.lib import socket
from pytcp.lib.net_addr.ip_address import IpAddress

if TYPE_CHECKING:
from pytcp.lib.socket import Socket
Expand All @@ -54,22 +54,14 @@ class UdpService:
"""

def __init__(
self,
*,
service_name: str,
local_ip_address: str,
local_port: int,
self, *, service_name: str, local_ip_address: IpAddress, local_port: int
) -> None:
"""
Class constructor.
"""

self._service_name = service_name
self._local_ip_address = (
local_ip_address.split("/")[0]
if "/" in local_ip_address
else local_ip_address
)
self._local_ip_address = local_ip_address
self._local_port = local_port
self._run_thread = False

Expand All @@ -80,7 +72,7 @@ def start(self) -> None:

click.echo(f"Starting the UDP {self._service_name} service.")
self._run_thread = True
threading.Thread(target=self.__thread_service).start()
threading.Thread(target=self.__thread__service).start()
time.sleep(0.1)

def stop(self) -> None:
Expand All @@ -92,29 +84,25 @@ def stop(self) -> None:
self._run_thread = False
time.sleep(0.1)

def __thread_service(self) -> None:
def __thread__service(self) -> None:
"""
Service initialization.
"""

version = ip_version(self._local_ip_address)
if version == 6:
listening_socket = sock.socket(
family=sock.AF_INET6, type=sock.SOCK_DGRAM
)
elif version == 4:
listening_socket = sock.socket(
family=sock.AF_INET4, type=sock.SOCK_DGRAM
)
else:
click.echo(
f"Service UDP {self._service_name}: Invalid local IP address - "
f"{self._local_ip_address}."
)
return
match self._local_ip_address.version:
case 6:
listening_socket = socket.socket(
family=socket.AF_INET6, type=socket.SOCK_DGRAM
)
case 4:
listening_socket = socket.socket(
family=socket.AF_INET4, type=socket.SOCK_DGRAM
)

try:
listening_socket.bind((self._local_ip_address, self._local_port))
listening_socket.bind(
(str(self._local_ip_address), self._local_port)
)
click.echo(
f"Service UDP {self._service_name}: Socket created, bound to "
f"{self._local_ip_address}, port {self._local_port}."
Expand Down
Loading

0 comments on commit 7486798

Please sign in to comment.