Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZEO.asyncio: switch to async/await style #195

Closed
wants to merge 87 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
b577b7a
fix problems releaved by the test suite
d-maurer Apr 2, 2022
dc698a7
make doctest happy
d-maurer Apr 2, 2022
8c46a0e
- minor cleanups
dataflake Apr 4, 2022
655b1e3
- remove pypy from tox configuration [ci skip]
dataflake Apr 4, 2022
aa7dda1
limit connection waiting (to reduce test time in case of failure)
d-maurer Apr 5, 2022
b089a75
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer Apr 5, 2022
d651533
Update src/ZEO/tests/ConnectionTests.py
d-maurer Apr 5, 2022
c644ba2
`uvloop` workaround
d-maurer Apr 5, 2022
2faae53
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer Apr 8, 2022
7f68329
remove `msgpack` version restriction (no longer necessary as Python 2…
d-maurer Apr 8, 2022
9187165
Merge branch 'master' into asyncio3.5+
dataflake Apr 8, 2022
a9d9692
- fix merge issue [ci skip]
dataflake Apr 8, 2022
33639b9
drop tests with some storages to save test time
d-maurer Apr 9, 2022
4cacde8
bring `tox.ini` in line with `test_requires` in `setup.py`
d-maurer Apr 11, 2022
62efb98
fix connection race condition
d-maurer Apr 11, 2022
73c7871
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer Apr 11, 2022
c523b85
the `asyncio` tests observe from an external thread activity controll…
d-maurer Apr 11, 2022
814cc77
fix "wait||ready-connected` race"
d-maurer Apr 12, 2022
1283aef
Test loop observation: switch from "wait(DELAY)" (prone to race condi…
d-maurer Apr 12, 2022
4e20020
make `flake8` happy
d-maurer Apr 12, 2022
937d29c
Merge branch 'master' into asyncio3.5+
dataflake Apr 12, 2022
e9360d7
make `prefetch` more reliable:
d-maurer Apr 14, 2022
65dc1b6
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer Apr 14, 2022
defe92f
optimize prefetch
d-maurer Apr 14, 2022
5059a2c
Python 3.10 kills a coroutine started with `run_coroutine_threadsafe`…
d-maurer Apr 14, 2022
a1e8e66
make `flake8` happy
d-maurer Apr 14, 2022
98cecdf
Python 3.10 has removed the `loop` parameter
d-maurer Apr 14, 2022
574cc35
improve `close`
d-maurer Apr 19, 2022
c46a0da
`zeo-fan-out` race condition hunting; documentation improvements
d-maurer Apr 20, 2022
bf84637
cleanup -- now that we have understood the race condition
d-maurer Apr 20, 2022
cc3a606
improve flow control test
d-maurer Apr 20, 2022
2c5c167
redesign client close logic (with `asyncio` `close` starts the closin…
d-maurer May 6, 2022
2e27b85
Avoid additional `ResourceWarning`s (2 reamin due to the weakness of …
d-maurer May 11, 2022
504a95c
fix problems for Python before 3.8
d-maurer May 12, 2022
ebc229b
try to fix https://github.com/zopefoundation/ZEO/pull/195#issuecommen…
d-maurer May 13, 2022
1f61e73
add further closing tests
d-maurer May 16, 2022
ff8f0f7
capture log message in `client-config.test`
d-maurer May 16, 2022
72d815d
- experiment trying to prevent duplicate workflow runs
dataflake May 16, 2022
bdb31ce
- try to fix syntax
dataflake May 16, 2022
7cea602
improve log capturing: do not consider the registration failure log e…
d-maurer May 16, 2022
9a85643
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer May 16, 2022
9a543bc
uvloop workaround to prevent `close` deadlock
d-maurer May 17, 2022
09a4ece
clean up `tests_require` in `setup.py`
d-maurer May 18, 2022
868f189
Drop Python2 support
d-maurer May 18, 2022
61818b4
Allow to specify the process type of ZEO server processes with the en…
d-maurer May 19, 2022
0c23b5b
replace `mock` by `unittest.mock`
d-maurer May 19, 2022
80f3928
fix `unittest.mock` insufficiency in Python 3.5
d-maurer May 20, 2022
b17cdc4
improve test `zeo-fan-out.test`
d-maurer May 21, 2022
1728747
drop Python 3.5 support
d-maurer May 21, 2022
b3313fd
remove 3.5 from `tox.ini` as well (it will no longer work)
d-maurer May 21, 2022
7b77b2b
log flowcontrol (as it might be responsible for the "request blob" ha…
d-maurer May 22, 2022
463e0a1
Drop use of six
navytux May 23, 2022
93b1a81
Drop use of mock egg
d-maurer May 23, 2022
a586ef4
drop Python 3.5 support
d-maurer May 21, 2022
d455858
Drop use of manuel
navytux May 23, 2022
8060b6d
No longer pin msgpack < 1
navytux May 23, 2022
7763458
Merge branch 'master' into y/py3.1
navytux May 23, 2022
ff75bca
Sync with master
navytux May 23, 2022
cac3639
Sync with PR#200 (y/py3)
navytux May 23, 2022
e244529
optimization: switch back to futures which directly call (rather tha…
d-maurer May 27, 2022
e1cac43
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer May 27, 2022
214e209
fix for Python 3.7-
d-maurer May 27, 2022
c82021c
Merge branch 'master' into y/py3.1
navytux May 31, 2022
39ee68f
Sync with PR#200 (y/py3)
navytux May 31, 2022
28eaee5
Restore server_sync functionality
navytux May 31, 2022
72e6237
Optimize `SizedMessageProtocol` regarding readability and efficiency:
d-maurer Jun 1, 2022
8c9fa52
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer Jun 1, 2022
1ffc106
break reference cycle introduced by `ClientStorage.sync`
d-maurer Jun 1, 2022
2f125ec
avoid (some) cyclic garbage in server implementation
d-maurer Jun 2, 2022
d432ff2
simplify (and slightly optimize) `TaskExecutor`
d-maurer Jun 6, 2022
a143d41
move cache access out of `ClientIO` into `Protocol` (to ensure that w…
d-maurer Jun 9, 2022
2d62007
`cythonize`d `SizedMessageProtocol` and future/task optimizations (`s…
d-maurer Jun 9, 2022
e370854
move cache access out of `ClientIO` into `Protocol` (to ensure that w…
d-maurer Jun 9, 2022
b137352
own `Future` implementation
d-maurer Jun 9, 2022
1a51f29
minor optimization
d-maurer Jun 9, 2022
b2b2486
minor optimization
d-maurer Jun 9, 2022
6fad76c
inline state constants -- for speed
d-maurer Jun 10, 2022
6ad732a
optimize `cython` implementation for `SizedMessageProtocol`
d-maurer Jun 10, 2022
58182b9
optimize asynchronous calls to the server:
d-maurer Jun 11, 2022
b77847f
simplification and cleanup
d-maurer Jun 13, 2022
2ecbd41
provide `cython` optimizations
d-maurer Jun 13, 2022
163e85d
update `CHANGES.rst`
d-maurer Jun 14, 2022
9bb7c08
Python 3.6 compatibility
d-maurer Jun 14, 2022
afae7a5
optimization: suggest thread switches at main <-> IO thread boundaries;
d-maurer Jun 16, 2022
ac97508
fix doctest
d-maurer Jun 16, 2022
d41902b
optimization: suggest thread switches via `sleep(1us)` (rather than `…
d-maurer Jun 16, 2022
30e271b
optimization: do not suggest a thread switch for asynchronous calls: …
d-maurer Jun 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
own Future implementation
d-maurer committed Jun 9, 2022

Verified

This commit was signed with the committer’s verified signature.
theseion Max Leske
commit b137352356bf66075a4cb12851e4854e535028a5
267 changes: 160 additions & 107 deletions src/ZEO/asyncio/optimize.py
Original file line number Diff line number Diff line change
@@ -7,151 +7,204 @@
This module defines variants which run callbacks immediately.
"""

from asyncio import CancelledError
from asyncio import CancelledError, InvalidStateError, get_event_loop
from asyncio.base_tasks import _task_repr_info
from asyncio.futures import _PyFuture as PyFuture
from concurrent.futures import Future as ConcurrentFuture


class Future(PyFuture):
"""a ``Future`` calling (rather than scheduling) callbacks.
# ``Future`` states
PENDING = 0
RESULT = 1
EXCEPTION = 2
CANCELLED = 3

Context provided for callbacks is ignored.
"""
def __schedule_callbacks(self):
# make this empty to avoid the scheduling
return

_schedule_callbacks = __schedule_callbacks # for older versions

def _call_callbacks(self):
"""replacement for ``__schedule_callbacks``, calling directly."""
callbacks = self._callbacks[:]
if not callbacks:
return
self._callbacks[:] = []
for callback in callbacks:
callback(self)
class Future:
""" Minimal mostly ``asyncio`` compatible future.

In contrast to an ``asyncio`` future,
callbacks are called immediately, not scheduled;
their context is ignored.
"""
__slots__ = ("loop", "state", "_result", "callbacks",
"_asyncio_future_blocking")

def __init__(self, loop=None):
self.loop = loop if loop is not None else get_event_loop()
self.state = PENDING
self._result = None
self.callbacks = []
self._asyncio_future_blocking = False

def get_loop(self):
return self.loop

def cancel(self, msg=None):
if super().cancel(): # older versions do support ``msg``
self._call_callbacks()
return True
return False

def set_exception(self, exception):
super().set_exception(exception)
self._call_callbacks()
"""cancel the future if not done.

def set_result(self, result):
super().set_result(result)
self._call_callbacks()

def add_done_callback(self, fn, *, context=None):
"""Add a callback to be run when the future becomes done.
Return ``True``, if really cancelled.

The callback is called with a single argument - the future object. If
the future is already done when this is called, the callback
is called immediately.

ATT: ``context`` is ignored
*msg* is ignored.
"""
if self.done():
fn(self)
if self.state:
return False
self.state = CANCELLED
self._result = CancelledError()
self.call_callbacks()
return True

def cancelled(self):
return self.state == CANCELLED

def done(self):
return self.state

def result(self):
if self.state == PENDING:
raise InvalidStateError("not done")
elif self.state == RESULT:
return self._result
else:
self._callbacks.append(fn)
raise self._result

def remove_done_callback(self, fn):
"""Remove all instances of a callback from the "call when done" list.
def exception(self):
if self.state == PENDING:
raise InvalidStateError("not done")
elif self.state == RESULT:
return None
else:
return self._result

Returns the number of callbacks removed.
"""
filtered_callbacks = [f for f in self._callbacks if f != fn]
removed_count = len(self._callbacks) - len(filtered_callbacks)
if removed_count:
self._callbacks[:] = filtered_callbacks
return removed_count
def add_done_callback(self, cb, context=None):
if not self.state or self.callbacks:
self.callbacks.append(cb)
else:
cb(self)

def remove_done_callback(self, cb):
if self.state and self.callbacks:
raise NotImplementedError("cannot remove callbacks when done")
flt = [c for c in self.callbacks if c != cb]
rv = len(self.callbacks) - len(flt)
if rv:
self.callbacks[:] = flt
return rv

def call_callbacks(self):
for cb in self.callbacks: # allows ``callbacks`` to grow
cb(self)
del self.callbacks[:]

def set_result(self, result):
if self.state:
raise InvalidStateError("already done")
self.state = RESULT
self._result = result
self.call_callbacks()

def set_exception(self, exc):
if self.state:
raise InvalidStateError("already done")
if isinstance(exc, type):
exc = exc()
self.state = EXCEPTION
self._result = exc
self.call_callbacks()

def __await__(self):
if not self.state:
self._asyncio_future_blocking = True
yield self
return self.result()

__iter__ = __await__

def __str__(self):
cls = self.__class__
info = [cls.__module__ + "." + cls.__name__,
("PENDING", "RESULT", "EXCEPTION", "CANCELLED")[self.state],
self._result,
self.callbacks]
return " ".join(str(x) for x in info)


class CoroutineExecutor:
"""Mixin to provide simplified ``task`` essentials.
"""Execute a coroutine on behalf of a task.

No context support.

No ``cancel`` support (for the moment).
"""
slots = "coro", "task", "awaiting"

# to be defined by derived classes
# ASYNC = None

def __init__(self, coro, loop=None):
self._loop = loop
self._must_cancel = False
self._fut_waiter = None
self._coro = coro
if self.ASYNC:
super().__init__(loop=loop)
self._step()
else:
super().__init__()
self._loop.call_soon_threadsafe(self._step)

_repr_info = _task_repr_info

def get_name(self):
return "ZEO optimized task"
def __init__(self, task, coro):
self.task = task # likely creates a reference cycle
self.coro = coro

def cancel(self, msg=None):
raise NotImplementedError

def _step(self, exc=None):
"""run coroutine until next ``await`` or completion."""
assert not self.done()
coro = self._coro
self._fut_waiter = None
def step(self):
self.awaiting = None
try:
if exc is None:
result = coro.send(None)
result = self.coro.send(None)
except BaseException as e:
# we are done
task = self.task
self.task = None # break reference cycle
if isinstance(e, StopIteration):
task.set_result(e.value)
elif isinstance(e, CancelledError):
task._cancel()
else:
result = coro.throw(exc)
except StopIteration as exc:
super().set_result(exc.value)
except CancelledError as exc:
exc.__traceback__ = None # avoid cyclic garbage
super().cancel() # I.e., Future.cancel(self).
except (KeyboardInterrupt, SystemExit) as exc:
super().set_exception(exc)
raise
except BaseException as exc:
super().set_exception(exc)
task.set_exception(e)
if isinstance(e, (KeyboardInterrupt, SystemExit)):
raise
else:
blocking = getattr(result, '_asyncio_future_blocking', None)
assert blocking
assert getattr(result, '_asyncio_future_blocking', None)
result._asyncio_future_blocking = False
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel():
self._must_cancel = False
self.awaiting = result

@result.add_done_callback
def wakeup(unused, step=self._step):
def wakeup(unused, step=self.step):
step()
finally:
self = None # Needed to break cycles when an exception occurs.

def cancel(self):
raise NotImplementedError


class AsyncTask(Future):
"""Simplified ``asyncio.Task``.

Steps are not scheduled but executed immediately.
"""
__slots__ = "executor",

def __init__(self, coro, loop=None):
super().__init__(loop=loop)
self.executor = CoroutineExecutor(self, coro) # reference cycle
self.executor.step()

def cancel(self, msg=None):
return self.executor.cancel()

def _cancel(self):
return super().cancel()

class AsyncTask(CoroutineExecutor, Future):
"""simplified ``asyncio.Task``

class ConcurrentTask(ConcurrentFuture):
"""Task reporting to ``ConcurrentFuture``.

Steps are not scheduled but executed immediately.
"""
ASYNC = True

def __init__(self, coro, loop):
super().__init__()
self.executor = CoroutineExecutor(self, coro) # reference cycle
loop.call_soon_threadsafe(self.executor.step)

class ConcurrentTask(CoroutineExecutor, ConcurrentFuture):
"""Concurrent task"""
ASYNC = False
def cancel(self, msg=None):
return self.executor.cancel()

# Note: might need to redefine ``_repr_info``
def _cancel(self):
return super().cancel()


run_coroutine_threadsafe = ConcurrentTask