Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZEO.asyncio: switch to async/await style #195

Closed
wants to merge 87 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
b577b7a
fix problems releaved by the test suite
d-maurer Apr 2, 2022
dc698a7
make doctest happy
d-maurer Apr 2, 2022
8c46a0e
- minor cleanups
dataflake Apr 4, 2022
655b1e3
- remove pypy from tox configuration [ci skip]
dataflake Apr 4, 2022
aa7dda1
limit connection waiting (to reduce test time in case of failure)
d-maurer Apr 5, 2022
b089a75
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer Apr 5, 2022
d651533
Update src/ZEO/tests/ConnectionTests.py
d-maurer Apr 5, 2022
c644ba2
`uvloop` workaround
d-maurer Apr 5, 2022
2faae53
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer Apr 8, 2022
7f68329
remove `msgpack` version restriction (no longer necessary as Python 2…
d-maurer Apr 8, 2022
9187165
Merge branch 'master' into asyncio3.5+
dataflake Apr 8, 2022
a9d9692
- fix merge issue [ci skip]
dataflake Apr 8, 2022
33639b9
drop tests with some storages to save test time
d-maurer Apr 9, 2022
4cacde8
bring `tox.ini` in line with `test_requires` in `setup.py`
d-maurer Apr 11, 2022
62efb98
fix connection race condition
d-maurer Apr 11, 2022
73c7871
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer Apr 11, 2022
c523b85
the `asyncio` tests observe from an external thread activity controll…
d-maurer Apr 11, 2022
814cc77
fix "wait||ready-connected` race"
d-maurer Apr 12, 2022
1283aef
Test loop observation: switch from "wait(DELAY)" (prone to race condi…
d-maurer Apr 12, 2022
4e20020
make `flake8` happy
d-maurer Apr 12, 2022
937d29c
Merge branch 'master' into asyncio3.5+
dataflake Apr 12, 2022
e9360d7
make `prefetch` more reliable:
d-maurer Apr 14, 2022
65dc1b6
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer Apr 14, 2022
defe92f
optimize prefetch
d-maurer Apr 14, 2022
5059a2c
Python 3.10 kills a coroutine started with `run_coroutine_threadsafe`…
d-maurer Apr 14, 2022
a1e8e66
make `flake8` happy
d-maurer Apr 14, 2022
98cecdf
Python 3.10 has removed the `loop` parameter
d-maurer Apr 14, 2022
574cc35
improve `close`
d-maurer Apr 19, 2022
c46a0da
`zeo-fan-out` race condition hunting; documentation improvements
d-maurer Apr 20, 2022
bf84637
cleanup -- now that we have understood the race condition
d-maurer Apr 20, 2022
cc3a606
improve flow control test
d-maurer Apr 20, 2022
2c5c167
redesign client close logic (with `asyncio` `close` starts the closin…
d-maurer May 6, 2022
2e27b85
Avoid additional `ResourceWarning`s (2 reamin due to the weakness of …
d-maurer May 11, 2022
504a95c
fix problems for Python before 3.8
d-maurer May 12, 2022
ebc229b
try to fix https://github.com/zopefoundation/ZEO/pull/195#issuecommen…
d-maurer May 13, 2022
1f61e73
add further closing tests
d-maurer May 16, 2022
ff8f0f7
capture log message in `client-config.test`
d-maurer May 16, 2022
72d815d
- experiment trying to prevent duplicate workflow runs
dataflake May 16, 2022
bdb31ce
- try to fix syntax
dataflake May 16, 2022
7cea602
improve log capturing: do not consider the registration failure log e…
d-maurer May 16, 2022
9a85643
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer May 16, 2022
9a543bc
uvloop workaround to prevent `close` deadlock
d-maurer May 17, 2022
09a4ece
clean up `tests_require` in `setup.py`
d-maurer May 18, 2022
868f189
Drop Python2 support
d-maurer May 18, 2022
61818b4
Allow to specify the process type of ZEO server processes with the en…
d-maurer May 19, 2022
0c23b5b
replace `mock` by `unittest.mock`
d-maurer May 19, 2022
80f3928
fix `unittest.mock` insufficiency in Python 3.5
d-maurer May 20, 2022
b17cdc4
improve test `zeo-fan-out.test`
d-maurer May 21, 2022
1728747
drop Python 3.5 support
d-maurer May 21, 2022
b3313fd
remove 3.5 from `tox.ini` as well (it will no longer work)
d-maurer May 21, 2022
7b77b2b
log flowcontrol (as it might be responsible for the "request blob" ha…
d-maurer May 22, 2022
463e0a1
Drop use of six
navytux May 23, 2022
93b1a81
Drop use of mock egg
d-maurer May 23, 2022
a586ef4
drop Python 3.5 support
d-maurer May 21, 2022
d455858
Drop use of manuel
navytux May 23, 2022
8060b6d
No longer pin msgpack < 1
navytux May 23, 2022
7763458
Merge branch 'master' into y/py3.1
navytux May 23, 2022
ff75bca
Sync with master
navytux May 23, 2022
cac3639
Sync with PR#200 (y/py3)
navytux May 23, 2022
e244529
optimization: switch back to futures which directly call (rather tha…
d-maurer May 27, 2022
e1cac43
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer May 27, 2022
214e209
fix for Python 3.7-
d-maurer May 27, 2022
c82021c
Merge branch 'master' into y/py3.1
navytux May 31, 2022
39ee68f
Sync with PR#200 (y/py3)
navytux May 31, 2022
28eaee5
Restore server_sync functionality
navytux May 31, 2022
72e6237
Optimize `SizedMessageProtocol` regarding readability and efficiency:
d-maurer Jun 1, 2022
8c9fa52
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer Jun 1, 2022
1ffc106
break reference cycle introduced by `ClientStorage.sync`
d-maurer Jun 1, 2022
2f125ec
avoid (some) cyclic garbage in server implementation
d-maurer Jun 2, 2022
d432ff2
simplify (and slightly optimize) `TaskExecutor`
d-maurer Jun 6, 2022
a143d41
move cache access out of `ClientIO` into `Protocol` (to ensure that w…
d-maurer Jun 9, 2022
2d62007
`cythonize`d `SizedMessageProtocol` and future/task optimizations (`s…
d-maurer Jun 9, 2022
e370854
move cache access out of `ClientIO` into `Protocol` (to ensure that w…
d-maurer Jun 9, 2022
b137352
own `Future` implementation
d-maurer Jun 9, 2022
1a51f29
minor optimization
d-maurer Jun 9, 2022
b2b2486
minor optimization
d-maurer Jun 9, 2022
6fad76c
inline state constants -- for speed
d-maurer Jun 10, 2022
6ad732a
optimize `cython` implementation for `SizedMessageProtocol`
d-maurer Jun 10, 2022
58182b9
optimize asynchronous calls to the server:
d-maurer Jun 11, 2022
b77847f
simplification and cleanup
d-maurer Jun 13, 2022
2ecbd41
provide `cython` optimizations
d-maurer Jun 13, 2022
163e85d
update `CHANGES.rst`
d-maurer Jun 14, 2022
9bb7c08
Python 3.6 compatibility
d-maurer Jun 14, 2022
afae7a5
optimization: suggest thread switches at main <-> IO thread boundaries;
d-maurer Jun 16, 2022
ac97508
fix doctest
d-maurer Jun 16, 2022
d41902b
optimization: suggest thread switches via `sleep(1us)` (rather than `…
d-maurer Jun 16, 2022
30e271b
optimization: do not suggest a thread switch for asynchronous calls: …
d-maurer Jun 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Optimize SizedMessageProtocol regarding readability and efficiency:
use `nonlocal` (instead of mutable wrapping) for shared variables
drop Python 3.5 compatibility code
use state comparison (`if current_state is STATE: ...`) instead of state function call (`current_state()`) (overhead: 11ns instead of 66ns)
d-maurer committed Jun 1, 2022
commit 72e6237fe5fd9d21f68b66282381cb1e2f64c85e
143 changes: 68 additions & 75 deletions src/ZEO/asyncio/base.py
Original file line number Diff line number Diff line change
@@ -33,16 +33,11 @@
"""
from asyncio import Protocol
import logging
import socket
from struct import pack
from struct import unpack
import sys
import struct


logger = logging.getLogger(__name__)

INET_FAMILIES = socket.AF_INET, socket.AF_INET6


class ZEOBaseProtocol(Protocol):
"""ZEO protocol base class for the common features."""
@@ -164,27 +159,16 @@ class SizedMessageProtocol(Protocol):
``SizedMessageProtocol`` instances can be used concurrently
from coroutines (executed in the same thread).
They are not thread safe.

Note: we would like to implement this via
``asyncio.StreamReaderProtocol`` (much clearer) but
this would require a more faithfull loop implementation
which would make testing much more difficult.
"""
def __init__(self, loop, receive):
self.receive = receive

def connection_made(self, transport):

if sys.version_info < (3, 6):
sock = transport.get_extra_info('socket')
if sock is not None and sock.family in INET_FAMILIES:
# See https://bugs.python.org/issue27456 :(
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)

self.transport = transport

# output handling
self.paused = paused = [] # non empty indicates we must buffer output
pack = struct.pack
paused = False # if ``paused`` we must buffer output
output = [] # output buffer - contains messages or iterators
append = output.append
writelines = transport.writelines
@@ -225,8 +209,8 @@ def write_message_iter(message_iter):

def resume_writing():
# precondition: ``paused`` and "at least 1 message writable"
del paused[:]
logger.debug("writing resumed")
nonlocal paused
paused = False
while output and not paused:
message = output.pop(0)
if isinstance(message, bytes):
@@ -244,73 +228,78 @@ def resume_writing():

self.resume_writing = resume_writing

def pause_writing():
nonlocal paused
paused = True

self.pause_writing = pause_writing

# input handling
# the following implements a state machine with
# states ``process_size`` and ``process_message``
received_count = [0] # number of received (not yet processed) bytes
# states ``process_size``, ``process_message`` and ``closed``
process_size = 1
process_message = 2
closed = None
read_state = process_size # current state
read_wanted = 4 # bytes required for this state
received_count = 0 # number of received (not yet processed) bytes
received_buffer = [] # received data chunks
read_state = [None] # read state: (*count*, *processor*)

def read(no):
"""return *no* bytes from the received buffer.

Precondition: the received buffer contains at least *no* bytes.
"""
need = no
data = b""
while need:
chunk = received_buffer[0]
if len(chunk) > need:
data += chunk[:need]
received_buffer[0] = chunk[need:]
need = 0
else:
del received_buffer[0]
data += chunk
need -= len(chunk)
received_count[0] -= no
return data

# the following two functions introduce a reference cycle
# broken in ``eof_received``
def process_size(size):
read_state[0] = (unpack(">I", size)[0], process_message)

def process_message(message):
try:
self.receive(message) # may close: ``read_state[0] = None``
except Exception:
logger.exception("Processing message `%r` failed" % message)
if read_state[0]: # not yet closed
read_state[0] = (4, process_size)
chunk_index = 0 # first unprocessed byte in first chunk
unpack = struct.unpack

def data_received(data):
nonlocal received_count, read_state, read_wanted, chunk_index
received_buffer.append(data)
received_count[0] += len(data)
while read_state[0] and read_state[0][0] <= received_count[0]:
no, processor = read_state[0]
processor(read(no))

received_count += len(data)
# ``not read_state`` means "closed"
while read_state and read_wanted <= received_count:
# transfer ``read_wanted`` bytes into ``data``
data = None
wanted = read_wanted
while wanted:
chunk = received_buffer[0]
ch_unprocessed = len(chunk) - chunk_index
if ch_unprocessed > wanted:
n_index = chunk_index + wanted
fragment = chunk[chunk_index:n_index]
chunk_index = n_index
wanted = 0
else:
del received_buffer[0]
fragment = \
chunk[chunk_index:] if chunk_index else chunk
chunk_index = 0
wanted -= ch_unprocessed
if data is None: # typical case
data = fragment
else:
data += fragment
received_count -= read_wanted
# process ``data``
if read_state is process_size:
read_state = process_message
read_wanted = unpack(">I", data)[0]
else: # ``read_state is process_message``
try:
self.receive(data) # may close: ``not read_state``
except Exception:
logger.exception("Processing message `%r` failed"
% data)
if read_state: # not yet closed
read_state = process_size
read_wanted = 4

# the following introduces a reference cycle broken in ``close``
self.data_received = data_received

def eof_received():
nonlocal process_size, process_message
read_state[0] = None
try:
del process_size, process_message # break reference cycle
except NameError:
pass
nonlocal read_state
read_state = closed

self.eof_received = eof_received

# start processing
read_state[0] = (4, process_size)
self.connected = True

def pause_writing(self):
logger.debug("writing paused")
self.paused.append(1)

def set_receive(self, receive):
self.receive = receive

@@ -322,15 +311,19 @@ def close(self):
self.__closed = True
self.eof_received()
self.transport.close()
self.transport = self.receive = None # break reference cycles
# break reference cycles
self.transport = self.receive = self.data_received = None

# We define ``connection_lost`` to close the transport
# in order to avoid a ``ResourceWarning``
# about an unclosed SSL transport -- it should not be necessary
# as the transport informed us about the lost connection.
# It also helps for some tests which call ``connection_lost``
# without transport intervention.
connection_lost_called = False # for tests

def connection_lost(self, exc):
self.connection_lost_called = True
if self.__closed:
return
self.transport.close()
52 changes: 52 additions & 0 deletions src/ZEO/asyncio/tests.py
Original file line number Diff line number Diff line change
@@ -1227,6 +1227,10 @@ def setUp(self):
self.transport = loop.transport
self.protocol = loop.protocol

def tearDown(self):
self.protocol.close()
self.loop.close()

def test_sm_write_message(self):
protocol, transport = self.protocol, self.transport
for i in range(2):
@@ -1255,6 +1259,46 @@ def test_sm_receive(self):
protocol.data_received(sized(msg))
self.assertEqual(self.received, msgs)

def test_sm_data_received(self):
msg = b"a test message"
data = sized(msg) * 2
receive = self.protocol.data_received
expected = [msg] * 2

def check(*split_size):
idx = 0
for size in split_size:
nidx = idx + size
receive(data[idx:nidx])
idx = nidx
receive(data[idx:])
self.assertEqual(self.received, expected)
del self.received[:]

# single chunk; i.e. split chunk
check()
# individual messages
check(len(data) // 2)
# combine from 2 chunks
check(2, 2, 2)
# combine from 3 chunks
check(5, 2)
# optimal
check(4, len(msg), 4, len(msg))

# check message processing closed the protocol
# once closed, no further data is processed
proto = self.protocol
old_receive = proto.receive

def close(msg):
proto.close()
proto.set_receive(old_receive)

proto.set_receive(close)
receive(data)
self.assertEqual(self.received, [])


def to_byte(i):
return bytes([i])
@@ -1343,6 +1387,14 @@ async def noop():
def test_asnyc_future(self):
self.check_future(self.loop.create_future(), True)

def test_repr(self):

async def noop():
pass

t = self.make_task(noop)
repr(t) # satisfied if no exception

def test_optimized_future(self):
self.check_future(Future(loop=self.loop))