Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transports: Now send_all_from_iterable() sends a single chunk by default #329

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/easynetwork/lowlevel/api_async/transports/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,11 @@ async def send_all_from_iterable(self, iterable_of_data: Iterable[bytes | bytear
Parameters:
iterable_of_data: An :term:`iterable` yielding the bytes to send.
"""
for data in list(iterable_of_data):
await self.send_all(data)

# By default, all chunks are concatenated and sent once.
data = b"".join(iterable_of_data)
del iterable_of_data
await self.send_all(data)


class AsyncStreamTransport(AsyncStreamWriteTransport, AsyncStreamReadTransport):
Expand Down
9 changes: 5 additions & 4 deletions src/easynetwork/lowlevel/api_sync/transports/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,11 @@ def send_all_from_iterable(self, iterable_of_data: Iterable[bytes | bytearray |
ValueError: Negative `timeout`.
TimeoutError: Operation timed out.
"""
for data in list(iterable_of_data):
with _utils.ElapsedTime() as elapsed:
self.send_all(data, timeout)
timeout = elapsed.recompute_timeout(timeout)

# By default, all chunks are concatenated and sent once.
data = b"".join(iterable_of_data)
del iterable_of_data
self.send_all(data, timeout)


class StreamTransport(StreamWriteTransport, StreamReadTransport):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,4 @@ async def test____send_all_from_iterable____concatenates_chunks_and_call_send_al
await AsyncStreamTransport.send_all_from_iterable(mock_stream_socket_adapter, chunks)

# Assert
assert mock_stream_socket_adapter.send_all.await_args_list == list(map(mocker.call, chunks))

async def test____send_all_from_iterable____single_yield____no_copy(
self,
mock_stream_socket_adapter: MagicMock,
mocker: MockerFixture,
) -> None:
# Arrange
chunk = mocker.sentinel.chunk
mock_stream_socket_adapter.send_all.return_value = None

# Act
await AsyncStreamTransport.send_all_from_iterable(mock_stream_socket_adapter, iter([chunk]))

# Assert
mock_stream_socket_adapter.send_all.assert_awaited_once_with(chunk)
assert mock_stream_socket_adapter.send_all.await_args_list == [mocker.call(b"abc")]
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,6 @@ def test____send_all_from_iterable____concatenates_chunks_and_call_send_all(
mock_time_perfcounter.side_effect = [
now,
now + 5,
now + 5,
now + 8,
now + 8,
now + 14,
]
timeout: float = 123456789
mock_transport.send_all.return_value = None
Expand All @@ -183,22 +179,5 @@ def test____send_all_from_iterable____concatenates_chunks_and_call_send_all(

# Assert
assert mock_transport.send_all.call_args_list == [
mocker.call(b"a", timeout),
mocker.call(bytearray(b"b"), timeout - 5),
mocker.call(memoryview(b"c"), timeout - 8),
mocker.call(b"abc", timeout),
]

def test____send_all_from_iterable____single_yield____no_copy(
self,
mock_transport: MagicMock,
mocker: MockerFixture,
) -> None:
# Arrange
chunk = mocker.sentinel.chunk
mock_transport.send_all.return_value = None

# Act
StreamTransport.send_all_from_iterable(mock_transport, iter([chunk]), 123456789)

# Assert
mock_transport.send_all.assert_called_once_with(chunk, 123456789)
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,9 @@ def test____send_all_from_iterable____fallback_to_send_all____sendmsg_unavailabl

# Assert
mock_transport_retry.assert_not_called()
assert mock_transport_send_all.call_args_list == list(
map(
lambda data: mocker.call(data, mocker.ANY),
[b"data", b"to", b"send"],
)
)
assert mock_transport_send_all.call_args_list == [
mocker.call(b"".join([b"data", b"to", b"send"]), mocker.ANY),
]

@pytest.mark.parametrize("SC_IOV_MAX", [-1, 0], ids=lambda p: f"SC_IOV_MAX=={p}", indirect=True)
def test____send_all_from_iterable____fallback_to_send_all____sendmsg_available_but_no_defined_limit(
Expand All @@ -448,12 +445,9 @@ def test____send_all_from_iterable____fallback_to_send_all____sendmsg_available_
# Assert
mock_transport_retry.assert_not_called()
mock_tcp_socket.sendmsg.assert_not_called()
assert mock_transport_send_all.call_args_list == list(
map(
lambda data: mocker.call(data, mocker.ANY),
[b"data", b"to", b"send"],
)
)
assert mock_transport_send_all.call_args_list == [
mocker.call(b"".join([b"data", b"to", b"send"]), mocker.ANY),
]

@pytest.mark.parametrize(
"os_error",
Expand Down