Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZEO.asyncio: switch to async/await style #195

Closed
wants to merge 87 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
b577b7a
fix problems releaved by the test suite
d-maurer Apr 2, 2022
dc698a7
make doctest happy
d-maurer Apr 2, 2022
8c46a0e
- minor cleanups
dataflake Apr 4, 2022
655b1e3
- remove pypy from tox configuration [ci skip]
dataflake Apr 4, 2022
aa7dda1
limit connection waiting (to reduce test time in case of failure)
d-maurer Apr 5, 2022
b089a75
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer Apr 5, 2022
d651533
Update src/ZEO/tests/ConnectionTests.py
d-maurer Apr 5, 2022
c644ba2
`uvloop` workaround
d-maurer Apr 5, 2022
2faae53
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer Apr 8, 2022
7f68329
remove `msgpack` version restriction (no longer necessary as Python 2…
d-maurer Apr 8, 2022
9187165
Merge branch 'master' into asyncio3.5+
dataflake Apr 8, 2022
a9d9692
- fix merge issue [ci skip]
dataflake Apr 8, 2022
33639b9
drop tests with some storages to save test time
d-maurer Apr 9, 2022
4cacde8
bring `tox.ini` in line with `test_requires` in `setup.py`
d-maurer Apr 11, 2022
62efb98
fix connection race condition
d-maurer Apr 11, 2022
73c7871
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer Apr 11, 2022
c523b85
the `asyncio` tests observe from an external thread activity controll…
d-maurer Apr 11, 2022
814cc77
fix "wait||ready-connected` race"
d-maurer Apr 12, 2022
1283aef
Test loop observation: switch from "wait(DELAY)" (prone to race condi…
d-maurer Apr 12, 2022
4e20020
make `flake8` happy
d-maurer Apr 12, 2022
937d29c
Merge branch 'master' into asyncio3.5+
dataflake Apr 12, 2022
e9360d7
make `prefetch` more reliable:
d-maurer Apr 14, 2022
65dc1b6
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer Apr 14, 2022
defe92f
optimize prefetch
d-maurer Apr 14, 2022
5059a2c
Python 3.10 kills a coroutine started with `run_coroutine_threadsafe`…
d-maurer Apr 14, 2022
a1e8e66
make `flake8` happy
d-maurer Apr 14, 2022
98cecdf
Python 3.10 has removed the `loop` parameter
d-maurer Apr 14, 2022
574cc35
improve `close`
d-maurer Apr 19, 2022
c46a0da
`zeo-fan-out` race condition hunting; documentation improvements
d-maurer Apr 20, 2022
bf84637
cleanup -- now that we have understood the race condition
d-maurer Apr 20, 2022
cc3a606
improve flow control test
d-maurer Apr 20, 2022
2c5c167
redesign client close logic (with `asyncio` `close` starts the closin…
d-maurer May 6, 2022
2e27b85
Avoid additional `ResourceWarning`s (2 reamin due to the weakness of …
d-maurer May 11, 2022
504a95c
fix problems for Python before 3.8
d-maurer May 12, 2022
ebc229b
try to fix https://github.com/zopefoundation/ZEO/pull/195#issuecommen…
d-maurer May 13, 2022
1f61e73
add further closing tests
d-maurer May 16, 2022
ff8f0f7
capture log message in `client-config.test`
d-maurer May 16, 2022
72d815d
- experiment trying to prevent duplicate workflow runs
dataflake May 16, 2022
bdb31ce
- try to fix syntax
dataflake May 16, 2022
7cea602
improve log capturing: do not consider the registration failure log e…
d-maurer May 16, 2022
9a85643
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer May 16, 2022
9a543bc
uvloop workaround to prevent `close` deadlock
d-maurer May 17, 2022
09a4ece
clean up `tests_require` in `setup.py`
d-maurer May 18, 2022
868f189
Drop Python2 support
d-maurer May 18, 2022
61818b4
Allow to specify the process type of ZEO server processes with the en…
d-maurer May 19, 2022
0c23b5b
replace `mock` by `unittest.mock`
d-maurer May 19, 2022
80f3928
fix `unittest.mock` insufficiency in Python 3.5
d-maurer May 20, 2022
b17cdc4
improve test `zeo-fan-out.test`
d-maurer May 21, 2022
1728747
drop Python 3.5 support
d-maurer May 21, 2022
b3313fd
remove 3.5 from `tox.ini` as well (it will no longer work)
d-maurer May 21, 2022
7b77b2b
log flowcontrol (as it might be responsible for the "request blob" ha…
d-maurer May 22, 2022
463e0a1
Drop use of six
navytux May 23, 2022
93b1a81
Drop use of mock egg
d-maurer May 23, 2022
a586ef4
drop Python 3.5 support
d-maurer May 21, 2022
d455858
Drop use of manuel
navytux May 23, 2022
8060b6d
No longer pin msgpack < 1
navytux May 23, 2022
7763458
Merge branch 'master' into y/py3.1
navytux May 23, 2022
ff75bca
Sync with master
navytux May 23, 2022
cac3639
Sync with PR#200 (y/py3)
navytux May 23, 2022
e244529
optimization: switch back to futures which directly call (rather tha…
d-maurer May 27, 2022
e1cac43
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer May 27, 2022
214e209
fix for Python 3.7-
d-maurer May 27, 2022
c82021c
Merge branch 'master' into y/py3.1
navytux May 31, 2022
39ee68f
Sync with PR#200 (y/py3)
navytux May 31, 2022
28eaee5
Restore server_sync functionality
navytux May 31, 2022
72e6237
Optimize `SizedMessageProtocol` regarding readability and efficiency:
d-maurer Jun 1, 2022
8c9fa52
Merge branch 'asyncio3.5+' of https://github.com/zopefoundation/ZEO i…
d-maurer Jun 1, 2022
1ffc106
break reference cycle introduced by `ClientStorage.sync`
d-maurer Jun 1, 2022
2f125ec
avoid (some) cyclic garbage in server implementation
d-maurer Jun 2, 2022
d432ff2
simplify (and slightly optimize) `TaskExecutor`
d-maurer Jun 6, 2022
a143d41
move cache access out of `ClientIO` into `Protocol` (to ensure that w…
d-maurer Jun 9, 2022
2d62007
`cythonize`d `SizedMessageProtocol` and future/task optimizations (`s…
d-maurer Jun 9, 2022
e370854
move cache access out of `ClientIO` into `Protocol` (to ensure that w…
d-maurer Jun 9, 2022
b137352
own `Future` implementation
d-maurer Jun 9, 2022
1a51f29
minor optimization
d-maurer Jun 9, 2022
b2b2486
minor optimization
d-maurer Jun 9, 2022
6fad76c
inline state constants -- for speed
d-maurer Jun 10, 2022
6ad732a
optimize `cython` implementation for `SizedMessageProtocol`
d-maurer Jun 10, 2022
58182b9
optimize asynchronous calls to the server:
d-maurer Jun 11, 2022
b77847f
simplification and cleanup
d-maurer Jun 13, 2022
2ecbd41
provide `cython` optimizations
d-maurer Jun 13, 2022
163e85d
update `CHANGES.rst`
d-maurer Jun 14, 2022
9bb7c08
Python 3.6 compatibility
d-maurer Jun 14, 2022
afae7a5
optimization: suggest thread switches at main <-> IO thread boundaries;
d-maurer Jun 16, 2022
ac97508
fix doctest
d-maurer Jun 16, 2022
d41902b
optimization: suggest thread switches via `sleep(1us)` (rather than `…
d-maurer Jun 16, 2022
30e271b
optimization: do not suggest a thread switch for asynchronous calls: …
d-maurer Jun 18, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
Changelog
=========

6.0 (unreleased)
----------------

- Drop Python 2 support

- Reimplement and streamline the ``asyncio`` part of the ``ClientStorage``
implementation:
- switch from futures with explicit callbacks to `async/await` style
- use standard ``asyncio`` features to implement timeouts
- redesign the API of the class implementing the ZEO client protocol
- significantly improve source documentation

- Drop credentials support: the corresponding ``ClientStorage.__init__``
parameters (i.e. ``credentials``, ``username``, ``password``)
are retained but ignored.
Note: the ZEO 5 server never supported credentials; the feature
has previously been retained for the use case
"ZEO 5 client with ZEO 4 server".


5.3.1 (unreleased)
------------------

Expand All @@ -21,6 +41,26 @@ Changelog



5.3.0 (2022-03-24)
------------------

- Add ``ConflictError`` to the list of unlogged server exceptions
(the client/its application should determine whether it wants
them logged).

Prevent ``no current transaction: tpc_abort()`` server log entries.
The storage API allows ``tpc_abort`` to be called with an
invalid transaction (the call should be ignored in this case)
and the server's ``tpc_vote`` relies on this.

Change the server's log message label for request exceptions
from ``Bad request ...`` to ``... raised exception:``,
hinting towards a server rather than client problem.

See `issue 156 <https://github.com/zopefoundation/ZEO/issues/156>`_.



5.3.0 (2022-03-24)
------------------

Expand Down Expand Up @@ -105,7 +145,7 @@ Changelog

- Fixed to work with some changes made in ZODB 5.4.0.

Client-side updates are incuded for ZODB 5.4.0 or databases that
Client-side updates are included for ZODB 5.4.0 or databases that
already had ``zodbpickle.binary`` OIDs. See `issue 113
<https://github.com/zopefoundation/ZEO/issues/113>`_.

Expand Down
6 changes: 1 addition & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,13 @@
'ZODB >= 5.5.1',
'zope.testing',
'manuel',
'random2',
'mock',
'msgpack < 1',
d-maurer marked this conversation as resolved.
Show resolved Hide resolved
'zope.testrunner',
]

classifiers = """
Intended Audience :: Developers
License :: OSI Approved :: Zope Public License
Programming Language :: Python :: 2
Programming Language :: Python :: 2.7
Programming Language :: Python :: 3
Programming Language :: Python :: 3.5
Programming Language :: Python :: 3.6
Expand Down Expand Up @@ -158,5 +154,5 @@ def emit(self, record):
zeo-nagios = ZEO.nagios:main
""",
include_package_data=True,
python_requires='>=2.7.9,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*',
python_requires='>=3.5.2',
)
94 changes: 66 additions & 28 deletions src/ZEO/ClientStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def __init__(self, addr, storage='1', cache_size=20 * MB,
This is typically invoked from a custom_zodb.py file.

All arguments except addr should be keyword arguments.

Arguments:

addr
Expand All @@ -118,6 +119,20 @@ def __init__(self, addr, storage='1', cache_size=20 * MB,
connection. A hostname may be a DNS name or a dotted IP
address. Required.

All addresses are assumed to serve (essentially)
the same (potentially replicated) storage.
A connection tries to connect to those addresses;
the first successful connection establishment with
the called for ("read_only" or "writable") capabilities
is selected and used for storage interaction until
the connection is lost. In that case, a
reconnection is tried.
If ``ClientStorage`` calls for the "writable" capability
but allows for a "read only" fallback,,
a read only connection can be used as a fallback;
if a writable connection becomes available later, a
switch to this connection is performed.

storage
The server storage name, defaulting to '1'. The name must
match one of the storage names supported by the server(s)
Expand All @@ -132,14 +147,12 @@ def __init__(self, addr, storage='1', cache_size=20 * MB,
address and the server storage name. This is used to
construct the response to getName()

cache
A cache object or a name, relative to the current working
directory, used to construct persistent cache filenames.
Defaults to None, in which case the cache is not
persistent. See ClientCache for more info.

wait_timeout
Maximum time to wait for results, including connecting.
Maximum time (seconds) to wait for connections,
defaulting to 30.
Note: the timeout applies only to [re]connect.
Normal operations can take arbitrary long. This
is important for long running operations, such as ``pack``.

read_only
A flag indicating whether this should be a
Expand All @@ -160,7 +173,7 @@ def __init__(self, addr, storage='1', cache_size=20 * MB,
shared_blob_dir
Flag whether the blob_dir is a server-shared filesystem
that should be used instead of transferring blob data over
ZEO protocol.
the ZEO protocol.

blob_cache_size
Maximum size of the ZEO blob cache, in bytes. If not set, then
Expand All @@ -170,22 +183,49 @@ def __init__(self, addr, storage='1', cache_size=20 * MB,
This option is ignored if shared_blob_dir is true.

blob_cache_size_check
ZEO check size as percent of blob_cache_size. The ZEO
Cache check size as percent of blob_cache_size. The ZEO
cache size will be checked when this many bytes have been
loaded into the cache. Defaults to 10% of the blob cache
size. This option is ignored if shared_blob_dir is true.

client_label
A label to include in server log messages for the client.

Note that the authentication protocol is defined by the server
and is detected by the ClientStorage upon connecting (see
testConnection() and doAuth() for details).

cache
A cache object or a file path (relative or absolute).
Defaults to None, in which case the cache is determined
from client and var.

ssl
An ssl client context (i.e. with purpose "ServerAuth")
to call for SSL connections.

ssl_server_hostname
The server hostname - used during the SSL authentication check

client
var
If cache is None, client determines the cache:
if it is None, then a non persisent cache is used;
otherwie, client is used together with var (defaults
to the current working directory) to construct the
file path for the persistent cache file

wait
Wait for server connection, defaulting to true.

credentials
username
password
realm
disconnect_poll
min_disconnect_poll
max_disconnect_poll
drop_cache_rather_verify
server_sync
ignored; retained (as parameters) for compatibility
"""

assert not username or password or realm

if isinstance(addr, int):
addr = ('127.0.0.1', addr)

Expand Down Expand Up @@ -260,14 +300,11 @@ def __init__(self, addr, storage='1', cache_size=20 * MB,
blob_cache_size * blob_cache_size_check // 100)
self._check_blob_size()

self.server_sync = server_sync

self._server = _client_factory(
addr, self, cache, storage,
ZEO.asyncio.client.Fallback if read_only_fallback else read_only,
wait_timeout or 30,
ssl = ssl, ssl_server_hostname=ssl_server_hostname,
credentials=credentials,
)
self._call = self._server.call
self._async = self._server.async_
Expand All @@ -280,7 +317,7 @@ def __init__(self, addr, storage='1', cache_size=20 * MB,
try:
self._wait()
except Exception:
# No point in keeping the server going of the storage
# No point in keeping the server going if the storage
# creation fails
self._server.close()
raise
Expand Down Expand Up @@ -368,8 +405,7 @@ def notify_connected(self, conn, info):
self._connection_generation += 1

if self._client_label:
conn.call_async_from_same_thread(
'set_client_label', self._client_label)
conn.call_async('set_client_label', self._client_label)

self._info.update(info)

Expand All @@ -386,12 +422,10 @@ def notify_connected(self, conn, info):
zope.interface.alsoProvides(self, iface)

if self.protocol_version[1:] >= b'5':
self.ping = lambda : self._call('ping')
self.ping = lambda : self._call('ping', timeout=0)
else:
self.ping = lambda : self._call('lastTransaction')
self.ping = lambda : self._call('lastTransaction', timeout=0)

if self.server_sync:
self.sync = self.ping

def set_server_addr(self, addr):
# Normalize server address and convert to string
Expand Down Expand Up @@ -863,7 +897,10 @@ def tpc_abort(self, txn, timeout=None):
# wait a little while in hopes of reconnecting. If
# we're able to reconnect and retry the transaction,
# ten it might succeed!
self._call('tpc_abort', id(txn), timeout=timeout)
if timeout is not None:
self._call('tpc_abort', id(txn), timeout=timeout)
else:
self._call('tpc_abort', id(txn))
except ClientDisconnected:
logger.debug("%s ClientDisconnected in tpc_abort() ignored",
self.__name__)
Expand Down Expand Up @@ -1034,8 +1071,9 @@ def _iterator_gc(self, disconnected=False):
return self._iterator_gc(True)
self._iterator_ids -= iids

def server_status(self):
return self._call('server_status')
def server_status(self, **kw):
"""supported keywords: ``timeout``."""
return self._call('server_status', **kw)

class TransactionIterator(object):

Expand Down
8 changes: 4 additions & 4 deletions src/ZEO/StorageServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import zope.interface
import six

from ZEO._compat import Pickler, Unpickler, PY3, BytesIO
from ZEO._compat import Pickler, Unpickler, BytesIO
from ZEO.Exceptions import AuthError
from ZEO.monitor import StorageStats
from ZEO.asyncio.server import Delay, MTDelay, Result
Expand Down Expand Up @@ -623,6 +623,7 @@ def ruok(self):
def ping(self):
pass


class StorageServerDB(object):
"""Adapter from StorageServerDB to ZODB.interfaces.IStorageWrapper

Expand Down Expand Up @@ -949,9 +950,8 @@ def server_status(self, storage_id):
status['timeout-thread-is-alive'] = lock_manager.timeout.is_alive()
last_transaction = self.storages[storage_id].lastTransaction()
last_transaction_hex = codecs.encode(last_transaction, 'hex_codec')
if PY3:
# doctests and maybe clients expect a str, not bytes
last_transaction_hex = str(last_transaction_hex, 'ascii')
# doctests and maybe clients expect a str, not bytes
last_transaction_hex = str(last_transaction_hex, 'ascii')
status['last-transaction'] = last_transaction_hex
return status

Expand Down
46 changes: 11 additions & 35 deletions src/ZEO/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,50 +16,26 @@
import sys
import platform

PY3 = sys.version_info[0] >= 3
PY32 = sys.version_info[:2] == (3, 2)
PYPY = getattr(platform, 'python_implementation', lambda: None)() == 'PyPy'
WIN = sys.platform.startswith('win')

if PY3:
from zodbpickle.pickle import Pickler, Unpickler as _Unpickler, dump, dumps, loads
class Unpickler(_Unpickler):
# Py3: Python 3 doesn't allow assignments to find_global,
# instead, find_class can be overridden
from zodbpickle.pickle import Pickler, Unpickler as _Unpickler, dump, dumps, loads
class Unpickler(_Unpickler):
# Python 3 doesn't allow assignments to find_global,
# instead, find_class can be overridden

find_global = None
find_global = None

def find_class(self, modulename, name):
if self.find_global is None:
return super(Unpickler, self).find_class(modulename, name)
return self.find_global(modulename, name)
else:
try:
import zodbpickle.fastpickle as cPickle
except ImportError:
import zodbpickle.pickle as cPickle
Pickler = cPickle.Pickler
Unpickler = cPickle.Unpickler
dump = cPickle.dump
dumps = cPickle.dumps
loads = cPickle.loads
def find_class(self, modulename, name):
if self.find_global is None:
return super(Unpickler, self).find_class(modulename, name)
return self.find_global(modulename, name)

# String and Bytes IO
from ZODB._compat import BytesIO

if PY3:

import _thread as thread
if PY32:
from threading import _get_ident as get_ident
else:
from threading import get_ident


else:

import thread
from thread import get_ident
import _thread as thread
from threading import get_ident

try:
from cStringIO import StringIO
Expand Down
Loading