Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added low-level API #135

Merged
merged 32 commits into from
Oct 28, 2023
Merged
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e325b7b
[ADD] Added low-level sync transports (not tested)
francis-clairicia Oct 1, 2023
9d91aa1
[ADD] Added docstrings to transports
francis-clairicia Oct 1, 2023
31e4838
[ADD] Added test to transports base classes
francis-clairicia Oct 3, 2023
4d90002
[ADD] Added test to SocketStreamTransport and SocketDatagramTransport
francis-clairicia Oct 3, 2023
0bca81d
[ADD] Added test to SSLStreamTransport
francis-clairicia Oct 5, 2023
57e2c8a
[ADD] Added StreamEndpoint and DatagramEndpoint
francis-clairicia Oct 6, 2023
b46e1e8
[FIX] Internal transport's extra must store callables
francis-clairicia Oct 6, 2023
8b2807b
[UPDATE] Backport of AnyIO's typed attributes system to replace get_e…
francis-clairicia Oct 7, 2023
2090211
[FIX] Clean code
francis-clairicia Oct 7, 2023
e69bbf3
[UPDATE] TCPNetworkClient and UDPNetworkClient now use the new lowlev…
francis-clairicia Oct 7, 2023
3b5c9af
[UPDATE] New Project Tree
francis-clairicia Oct 7, 2023
c1092f6
[ADD] Added unidirectional transports
francis-clairicia Oct 8, 2023
7c09919
[FIX] Improved extra typed attributes
francis-clairicia Oct 10, 2023
06c9176
[FIX] Fixed useless copy in GeneratorStreamReader.read_until()
francis-clairicia Oct 11, 2023
5e03ac6
[FIX] Added missing tests
francis-clairicia Oct 12, 2023
2f2a167
[FIX] StreamTransport.send_eof() now always call transport.send_eof()
francis-clairicia Oct 12, 2023
9e2cbc1
[ADD] Added asynchonous transports API
francis-clairicia Oct 12, 2023
7357431
[FIX] Added missing assertions in tests
francis-clairicia Oct 13, 2023
c68c2ca
[ADD] Added low-level async servers (not tested)
francis-clairicia Oct 13, 2023
2199c0c
[FIX] low-level async servers: Improved task management
francis-clairicia Oct 19, 2023
7ef9999
[ADD] Added a ResourceGuard context manager for synchronization confl…
francis-clairicia Oct 20, 2023
a030321
[FIX] Cosmetic changes
francis-clairicia Oct 21, 2023
25eece7
[FIX] (async) stream write transports: send_all_from_iterable() do no…
francis-clairicia Oct 21, 2023
edd3b45
[TESTS] Added tests for low-level async servers
francis-clairicia Oct 22, 2023
0958c8f
Merge remote-tracking branch 'origin/main' into feature/lowlevel-api
francis-clairicia Oct 22, 2023
29d0336
[UPDATE] Migrate asynchronous backend API and their high-level interf…
francis-clairicia Oct 28, 2023
6405ce5
[UPDATE] Added missing tests
francis-clairicia Oct 28, 2023
e3c9bb9
[DOCS] FIxed documentation
francis-clairicia Oct 28, 2023
69d0374
[FIX] Default stream buffer size is now set to 16KiB
francis-clairicia Oct 28, 2023
d1ac756
[FIX] Socket adapters that use asyncio.Transport and asyncio.Datagram…
francis-clairicia Oct 28, 2023
d87b72e
[FIX] Fixed failing test on macOS
francis-clairicia Oct 28, 2023
1960cbc
[FIX] Added missing abstractmethod decorators
francis-clairicia Oct 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[ADD] Added StreamEndpoint and DatagramEndpoint
francis-clairicia committed Oct 6, 2023
commit 57e2c8a3ce3c4fd172ff4426cf2c42987e1c3390
19 changes: 19 additions & 0 deletions src/easynetwork/api_sync/lowlevel/endpoints/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright 2021-2023, Francis Clairicia-Rose-Claire-Josephine
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
"""Low-level endpoints module"""

from __future__ import annotations

__all__ = [] # type: list[str]
134 changes: 134 additions & 0 deletions src/easynetwork/api_sync/lowlevel/endpoints/datagram.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Copyright 2021-2023, Francis Clairicia-Rose-Claire-Josephine
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
"""Low-level endpoints module"""

from __future__ import annotations

__all__ = ["DatagramEndpoint"]

import math
from typing import Any, Generic

from ...._typevars import _ReceivedPacketT, _SentPacketT
from ....protocol import DatagramProtocol
from ..transports.abc import DatagramTransport


class DatagramEndpoint(Generic[_SentPacketT, _ReceivedPacketT]):
"""
A communication endpoint based on unreliable packets of data.
"""

__slots__ = (
"__transport",
"__protocol",
"__weakref__",
)

def __init__(self, transport: DatagramTransport, protocol: DatagramProtocol[_SentPacketT, _ReceivedPacketT]) -> None:
"""
Parameters:
transport: The data transport to use.
protocol: The :term:`protocol object` to use.
"""

if not isinstance(transport, DatagramTransport):
raise TypeError(f"Expected a DatagramTransport object, got {transport!r}")
if not isinstance(protocol, DatagramProtocol):
raise TypeError(f"Expected a DatagramProtocol object, got {protocol!r}")

self.__transport: DatagramTransport = transport
self.__protocol: DatagramProtocol[_SentPacketT, _ReceivedPacketT] = protocol

def __del__(self) -> None: # pragma: no cover
try:
transport = self.__transport
except AttributeError:
return
if not transport.is_closed():
self.close()

def is_closed(self) -> bool:
"""
Checks if :meth:`close` has been called.

Returns:
:data:`True` if the endpoint is closed.
"""
return self.__transport.is_closed()

def close(self) -> None:
"""
Closes the endpoint.
"""
self.__transport.close()

def get_extra_info(self, name: str, default: Any = None) -> Any:
"""
Returns information about the transport or underlying resources it uses.

See :meth:`.BaseTransport.get_extra_info` for details.
"""
return self.__transport.get_extra_info(name, default=default)

def send_packet(self, packet: _SentPacketT, *, timeout: float | None = None) -> None:
"""
Sends `packet` to the remote endpoint.

If `timeout` is not :data:`None`, the entire send operation will take at most `timeout` seconds.

Warning:
A timeout on a send operation is unusual.

In the case of a timeout, it is impossible to know if all the packet data has been sent.

Parameters:
packet: the Python object to send.
timeout: the allowed time (in seconds) for blocking operations.

Raises:
TimeoutError: the send operation does not end up after `timeout` seconds.
"""
if timeout is None:
timeout = math.inf

transport = self.__transport
protocol = self.__protocol

transport.send(protocol.make_datagram(packet), timeout)

def recv_packet(self, *, timeout: float | None = None) -> _ReceivedPacketT:
"""
Waits for a new packet from the remote endpoint.

If `timeout` is not :data:`None`, the entire receive operation will take at most `timeout` seconds.

Parameters:
timeout: the allowed time (in seconds) for blocking operations.

Raises:
TimeoutError: the receive operation does not end up after `timeout` seconds.
DatagramProtocolParseError: invalid data received.

Returns:
the received packet.
"""
if timeout is None:
timeout = math.inf

transport = self.__transport
protocol = self.__protocol

return protocol.build_packet_from_datagram(transport.recv(timeout))
216 changes: 216 additions & 0 deletions src/easynetwork/api_sync/lowlevel/endpoints/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# Copyright 2021-2023, Francis Clairicia-Rose-Claire-Josephine
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
"""Low-level endpoints module"""

from __future__ import annotations

__all__ = ["StreamEndpoint"]

import errno as _errno
import math
import time
from typing import Any, Generic

from ...._typevars import _ReceivedPacketT, _SentPacketT
from ....protocol import StreamProtocol
from ....tools._stream import StreamDataConsumer as _StreamDataConsumer, StreamDataProducer as _StreamDataProducer
from ....tools._utils import error_from_errno as _error_from_errno
from ..transports.abc import StreamTransport


class StreamEndpoint(Generic[_SentPacketT, _ReceivedPacketT]):
"""
A communication endpoint based on continuous stream data transport.
"""

__slots__ = (
"__transport",
"__producer",
"__consumer",
"__max_recv_size",
"__eof_sent",
"__eof_reached",
"__weakref__",
)

def __init__(
self,
transport: StreamTransport,
protocol: StreamProtocol[_SentPacketT, _ReceivedPacketT],
max_recv_size: int,
) -> None:
"""
Parameters:
transport: The data transport to use.
protocol: The :term:`protocol object` to use.
max_recv_size: Read buffer size.
"""

if not isinstance(transport, StreamTransport):
raise TypeError(f"Expected a StreamTransport object, got {transport!r}")
if not isinstance(max_recv_size, int) or max_recv_size <= 0:
raise ValueError("'max_recv_size' must be a strictly positive integer")

self.__producer: _StreamDataProducer[_SentPacketT] = _StreamDataProducer(protocol)
self.__consumer: _StreamDataConsumer[_ReceivedPacketT] = _StreamDataConsumer(protocol)
self.__transport: StreamTransport = transport
self.__max_recv_size: int = max_recv_size
self.__eof_sent: bool = False
self.__eof_reached: bool = False

def __del__(self) -> None: # pragma: no cover
try:
transport = self.__transport
except AttributeError:
return
if not transport.is_closed():
self.close()

def is_closed(self) -> bool:
"""
Checks if :meth:`close` has been called.

Returns:
:data:`True` if the endpoint is closed.
"""
return self.__transport.is_closed()

def close(self) -> None:
"""
Closes the endpoint.
"""
self.__transport.close()
self.__consumer.clear()
self.__producer.clear()

def get_extra_info(self, name: str, default: Any = None) -> Any:
"""
Returns information about the transport or underlying resources it uses.

See :meth:`.BaseTransport.get_extra_info` for details.
"""
return self.__transport.get_extra_info(name, default=default)

def send_packet(self, packet: _SentPacketT, *, timeout: float | None = None) -> None:
"""
Sends `packet` to the remote endpoint.

If `timeout` is not :data:`None`, the entire send operation will take at most `timeout` seconds.

Warning:
A timeout on a send operation is unusual unless you have a SSL/TLS context.

In the case of a timeout, it is impossible to know if all the packet data has been sent.
This would leave the connection in an inconsistent state.

Parameters:
packet: the Python object to send.
timeout: the allowed time (in seconds) for blocking operations.

Raises:
TimeoutError: the send operation does not end up after `timeout` seconds.
RuntimeError: :meth:`send_eof` has been called earlier.
"""
if self.__eof_sent:
raise RuntimeError("send_eof() has been called earlier")

if timeout is None:
timeout = math.inf

transport = self.__transport
producer = self.__producer

producer.enqueue(packet)
transport.send_all_from_iterable(producer, timeout)

def send_eof(self) -> None:
"""
Close the write end of the stream after the buffered write data is flushed.

Can be safely called multiple times.
"""
if self.__eof_sent:
return

transport = self.__transport
producer = self.__producer

if not transport.is_closed():
transport.send_eof()
self.__eof_sent = True
producer.clear()

def recv_packet(self, *, timeout: float | None = None) -> _ReceivedPacketT:
"""
Waits for a new packet to arrive from the remote endpoint.

If `timeout` is not :data:`None`, the entire receive operation will take at most `timeout` seconds.

Parameters:
timeout: the allowed time (in seconds) for blocking operations.

Raises:
TimeoutError: the receive operation does not end up after `timeout` seconds.
EOFError: the read end of the stream is closed.
StreamProtocolParseError: invalid data received.

Returns:
the received packet.
"""
if timeout is None:
timeout = math.inf

transport = self.__transport
consumer = self.__consumer

try:
return next(consumer) # If there is enough data from last call to create a packet, return immediately
except StopIteration:
pass

if self.__eof_reached:
raise EOFError("end-of-stream")

bufsize: int = self.__max_recv_size
perf_counter = time.perf_counter # pull function to local namespace

while True:
_start = perf_counter()
chunk: bytes = transport.recv(bufsize, timeout)
_end = perf_counter()
if not chunk:
self.__eof_reached = True
raise EOFError("end-of-stream")
try:
consumer.feed(chunk)
finally:
del chunk
with consumer.get_buffer() as buffer:
buffer_not_full: bool = buffer.nbytes < bufsize
try:
return next(consumer)
except StopIteration:
if timeout > 0:
timeout -= _end - _start
timeout = max(timeout, 0.0)
elif buffer_not_full:
break
# Loop break
raise _error_from_errno(_errno.ETIMEDOUT)

@property
def max_recv_size(self) -> int:
"""Read buffer size. Read-only attribute."""
return self.__max_recv_size
Original file line number Diff line number Diff line change
@@ -117,7 +117,6 @@ def _retry(
else:
is_retry_interval = True
wait_time = retry_interval
available: bool
with self._selector_factory() as selector:
try:
selector.register(fileno, event)
21 changes: 0 additions & 21 deletions tests/unit_test/conftest.py
Original file line number Diff line number Diff line change
@@ -8,7 +8,6 @@
from easynetwork.converter import AbstractPacketConverterComposite
from easynetwork.protocol import DatagramProtocol, StreamProtocol
from easynetwork.serializers.abc import AbstractIncrementalPacketSerializer, AbstractPacketSerializer
from easynetwork.tools._stream import StreamDataConsumer, StreamDataProducer

import pytest

@@ -166,23 +165,3 @@ def mock_stream_protocol_factory(mocker: MockerFixture) -> Callable[[], Any]:
@pytest.fixture
def mock_stream_protocol(mock_stream_protocol_factory: Callable[[], Any]) -> Any:
return mock_stream_protocol_factory()


@pytest.fixture
def mock_stream_data_producer_factory(mocker: MockerFixture) -> Callable[[], Any]:
return lambda: mocker.NonCallableMagicMock(spec=StreamDataProducer)


@pytest.fixture
def mock_stream_data_producer(mock_stream_data_producer_factory: Callable[[], Any]) -> Any:
return mock_stream_data_producer_factory()


@pytest.fixture
def mock_stream_data_consumer_factory(mocker: MockerFixture) -> Callable[[], Any]:
return lambda: mocker.NonCallableMagicMock(spec=StreamDataConsumer)


@pytest.fixture
def mock_stream_data_consumer(mock_stream_data_consumer_factory: Callable[[], Any]) -> Any:
return mock_stream_data_consumer_factory()
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
from __future__ import annotations

import math
from typing import TYPE_CHECKING, Any

from easynetwork.api_sync.lowlevel.endpoints.datagram import DatagramEndpoint
from easynetwork.api_sync.lowlevel.transports.abc import DatagramTransport

import pytest

if TYPE_CHECKING:
from unittest.mock import MagicMock

from pytest_mock import MockerFixture


class TestDatagramEndpoint:
@pytest.fixture
@staticmethod
def mock_datagram_transport(mocker: MockerFixture) -> MagicMock:
mock_datagram_transport = mocker.NonCallableMagicMock(spec=DatagramTransport)
mock_datagram_transport.is_closed.return_value = False

def close_side_effect() -> None:
mock_datagram_transport.is_closed.return_value = True

mock_datagram_transport.close.side_effect = close_side_effect
return mock_datagram_transport

@pytest.fixture
@staticmethod
def mock_datagram_protocol(mock_datagram_protocol: MagicMock, mocker: MockerFixture) -> MagicMock:
def make_datagram_side_effect(packet: Any) -> bytes:
return str(packet).encode("ascii").removeprefix(b"sentinel.")

def build_packet_from_datagram_side_effect(data: bytes) -> Any:
return getattr(mocker.sentinel, data.decode("ascii"))

mock_datagram_protocol.make_datagram.side_effect = make_datagram_side_effect
mock_datagram_protocol.build_packet_from_datagram.side_effect = build_packet_from_datagram_side_effect
return mock_datagram_protocol

@pytest.fixture
@staticmethod
def endpoint(mock_datagram_transport: MagicMock, mock_datagram_protocol: MagicMock) -> DatagramEndpoint[Any, Any]:
return DatagramEndpoint(mock_datagram_transport, mock_datagram_protocol)

@pytest.fixture(
params=[
pytest.param(None, id="blocking (None)"),
pytest.param(math.inf, id="blocking (+inf)"),
pytest.param(0, id="non_blocking"),
pytest.param(123456789, id="with_timeout"),
]
)
@staticmethod
def recv_timeout(request: Any) -> Any:
return request.param

@pytest.fixture
@staticmethod
def expected_recv_timeout(recv_timeout: float | None) -> float:
if recv_timeout is None:
return math.inf
return recv_timeout

@pytest.fixture(
params=[
pytest.param(None, id="blocking (None)"),
pytest.param(math.inf, id="blocking (+inf)"),
pytest.param(0, id="non_blocking"),
pytest.param(123456789, id="with_timeout"),
]
)
@staticmethod
def send_timeout(request: Any) -> Any:
return request.param

@pytest.fixture
@staticmethod
def expected_send_timeout(send_timeout: float | None) -> float:
if send_timeout is None:
return math.inf
return send_timeout

def test____dunder_init____invalid_transport(
self,
mock_datagram_protocol: MagicMock,
mocker: MockerFixture,
) -> None:
# Arrange
mock_invalid_transport = mocker.NonCallableMagicMock(spec=object)

# Act & Assert
with pytest.raises(TypeError, match=r"^Expected a DatagramTransport object, got .*$"):
_ = DatagramEndpoint(mock_invalid_transport, mock_datagram_protocol)

def test____dunder_init____invalid_protocol(
self,
mock_datagram_transport: MagicMock,
mocker: MockerFixture,
) -> None:
# Arrange
mock_invalid_protocol = mocker.NonCallableMagicMock(spec=object)

# Act & Assert
with pytest.raises(TypeError, match=r"^Expected a DatagramProtocol object, got .*$"):
_ = DatagramEndpoint(mock_datagram_transport, mock_invalid_protocol)

@pytest.mark.parametrize("transport_closed", [False, True])
def test___is_closed____default(
self,
endpoint: DatagramEndpoint[Any, Any],
mock_datagram_transport: MagicMock,
transport_closed: bool,
) -> None:
# Arrange
mock_datagram_transport.is_closed.assert_not_called()
mock_datagram_transport.is_closed.return_value = transport_closed

# Act
state = endpoint.is_closed()

# Assert
mock_datagram_transport.is_closed.assert_called_once_with()
assert state is transport_closed

def test___close____default(self, endpoint: DatagramEndpoint[Any, Any], mock_datagram_transport: MagicMock) -> None:
# Arrange
mock_datagram_transport.close.assert_not_called()

# Act
endpoint.close()

# Assert
mock_datagram_transport.close.assert_called_once_with()

def test____get_extra_info____default(
self,
endpoint: DatagramEndpoint[Any, Any],
mock_datagram_transport: MagicMock,
mocker: MockerFixture,
) -> None:
# Arrange
mock_datagram_transport.get_extra_info.return_value = mocker.sentinel.extra_info

# Act
value = endpoint.get_extra_info(mocker.sentinel.name, default=mocker.sentinel.default)

# Assert
mock_datagram_transport.get_extra_info.assert_called_once_with(mocker.sentinel.name, default=mocker.sentinel.default)
assert value is mocker.sentinel.extra_info

def test____send_packet____send_bytes_to_transport(
self,
send_timeout: float | None,
expected_send_timeout: float,
endpoint: DatagramEndpoint[Any, Any],
mock_datagram_transport: MagicMock,
mock_datagram_protocol: MagicMock,
mocker: MockerFixture,
) -> None:
# Arrange
mock_datagram_transport.send.return_value = None

# Act
endpoint.send_packet(mocker.sentinel.packet, timeout=send_timeout)

# Assert
mock_datagram_protocol.make_datagram.assert_called_once_with(mocker.sentinel.packet)
mock_datagram_transport.send.assert_called_once_with(b"packet", expected_send_timeout)

def test____recv_packet____receive_bytes_from_transport(
self,
endpoint: DatagramEndpoint[Any, Any],
recv_timeout: float | None,
expected_recv_timeout: float,
mock_datagram_transport: MagicMock,
mock_datagram_protocol: MagicMock,
mocker: MockerFixture,
) -> None:
# Arrange
mock_datagram_transport.recv.side_effect = [b"packet"]

# Act
packet: Any = endpoint.recv_packet(timeout=recv_timeout)

# Assert
mock_datagram_transport.recv.assert_called_once_with(expected_recv_timeout)
mock_datagram_protocol.build_packet_from_datagram.assert_called_once_with(b"packet")
assert packet is mocker.sentinel.packet

Large diffs are not rendered by default.