From efd44fbbd16d863f24b421ef45e817e88fae23cd Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Sat, 4 May 2024 16:29:14 -0400 Subject: [PATCH] refactor: habitability, pytest idioms --- zodburi/__init__.py | 34 +- zodburi/datatypes.py | 68 +- zodburi/tests/test___init__.py | 2 +- zodburi/tests/test_datatypes.py | 170 ++-- zodburi/tests/test_resolvers.py | 1404 ++++++++++++++++--------------- 5 files changed, 907 insertions(+), 771 deletions(-) diff --git a/zodburi/__init__.py b/zodburi/__init__.py index d58275a..d9c7a7c 100644 --- a/zodburi/__init__.py +++ b/zodburi/__init__.py @@ -14,37 +14,41 @@ ) BYTES_PARAMETERS = ( - 'cache_size_bytes', - 'historical_cache_size_bytes', - 'large_record_size' + "cache_size_bytes", + "historical_cache_size_bytes", + "large_record_size" ) PARAMETERS = dict( - [('database_name', 'database_name')] + - [(f'connection_{parm}', parm) for parm in CONNECTION_PARAMETERS] + [("database_name", "database_name")] + + [(f"connection_{parm}", parm) for parm in CONNECTION_PARAMETERS] ) -HAS_UNITS_RE = re.compile(r'\s*(\d+)\s*([kmg])b\s*$') +HAS_UNITS_RE = re.compile(r"\s*(\d+)\s*([kmg])b\s*$") UNITS = dict(k=1<<10, m=1<<20, g=1<<30) + _DEFAULT_DBKW = { - 'cache_size': 10000, - 'pool_size': 7, - 'database_name': 'unnamed', + "cache_size": 10000, + "pool_size": 7, + "database_name": "unnamed", } + class NoResolverForScheme(KeyError): def __init__(self, uri): self.uri = uri - super().__init__(f'No resolver found for uri: {uri}') + super().__init__(f"No resolver found for uri: {uri}") + class UnknownDatabaseKeywords(KeyError): def __init__(self, kw): self.kw = kw super().__init__( - f'Unrecognized database keyword(s): {", ".join(kw)}' + f"Unrecognized database keyword(s): {", ".join(kw)}" ) + def resolve_uri(uri): """ Returns a tuple, (factory, dbkw) where factory is a no-arg callable which @@ -54,9 +58,10 @@ def resolve_uri(uri): factory, dbkw = _resolve_uri(uri) return factory, _get_dbkw(dbkw) + # _resolve_uri serves resolve_uri: it returns factory and original raw dbkw. def _resolve_uri(uri): - scheme = uri[:uri.find(':')] + scheme = uri[:uri.find(":")] try: resolver_eps = entry_points(group="zodburi.resolvers") except TypeError: # pragma: NO COVER Python < 3.10 @@ -70,21 +75,24 @@ def _resolve_uri(uri): else: raise NoResolverForScheme(uri) + def _parse_bytes(s): m = HAS_UNITS_RE.match(s.lower()) + if m: v, uname = m.group(1, 2) return int(v) * UNITS[uname] else: return int(s) + def _get_dbkw(kw): dbkw = _DEFAULT_DBKW.copy() for parameter in PARAMETERS: if parameter in kw: v = kw.pop(parameter) - if parameter.startswith('connection_'): + if parameter.startswith("connection_"): if not isinstance(v, int): if PARAMETERS[parameter] in BYTES_PARAMETERS: v = _parse_bytes(v) diff --git a/zodburi/datatypes.py b/zodburi/datatypes.py index cfeed9b..c039058 100644 --- a/zodburi/datatypes.py +++ b/zodburi/datatypes.py @@ -1,43 +1,67 @@ TRUETYPES = ('1', 'on', 'true', 't', 'yes') FALSETYPES = ('', '0', 'off', 'false', 'f', 'no') + +class SuffixLengthMismatch(ValueError): + def __init__(self, d): + self.d = d + super().__init__("All suffix keys must have the same length") + + class SuffixMultiplier: - # d is a dictionary of suffixes to integer multipliers. If no suffixes - # match, default is the multiplier. Matches are case insensitive. Return - # values are in the fundamental unit. + """Convert integer-like strings w/ size suffixes to integers + + - 'd' is a dictionary of suffixes to integer multipliers. + - 'default' is the multiplier if no suffixes match. + + Matches are case insensitive. + + Returned values are in the fundamental unit. + """ def __init__(self, d, default=1): - self._d = d - self._default = default # all keys must be the same size - self._keysz = None - for k in d.keys(): - if self._keysz is None: - self._keysz = len(k) - else: - if self._keysz != len(k): - raise ValueError('suffix length missmatch') + sizes = set(len(key) for key in d) + + if len(sizes) > 1: + raise SuffixLengthMismatch(d) + + self._d = {key.lower(): value for key, value in d.items()} + self._default = default + self._keysz = sizes.pop() if sizes else 0 def __call__(self, v): - v = v.lower() - for s, m in self._d.items(): - if v[-self._keysz:] == s: - return int(v[:-self._keysz]) * m - return int(v) * self._default + if self._keysz and len(v) > self._keysz: + v = v.lower() + suffix = v[-self._keysz:] + multiplier = self._d.get(suffix, self._default) + + if multiplier is not self._default: + v = v[:-self._keysz] + else: + multiplier = self._default + + return int(v) * multiplier + -convert_bytesize = SuffixMultiplier({'kb': 1024, - 'mb': 1024*1024, - 'gb': 1024*1024*1024, - }) +convert_bytesize = SuffixMultiplier({ + "kb": 1024, + "mb": 1024*1024, + "gb": 1024*1024*1024, +}) def convert_int(value): # boolean values are also treated as integers value = value.lower() + if value in FALSETYPES: return 0 + if value in TRUETYPES: return 1 + return int(value) + def convert_tuple(value): - return tuple(value.split(',')) + return tuple(value.split(",")) diff --git a/zodburi/tests/test___init__.py b/zodburi/tests/test___init__.py index 0c7d38e..7a6915a 100644 --- a/zodburi/tests/test___init__.py +++ b/zodburi/tests/test___init__.py @@ -3,7 +3,6 @@ import pytest import zodburi -from zodburi import resolvers @pytest.mark.parametrize("source, expected", [ @@ -24,6 +23,7 @@ def _expected_dbkw(**kw): dbkw.update(kw) return dbkw + @pytest.mark.parametrize("kw, expected", [ ({}, _expected_dbkw()), ({"database_name": "foo"}, _expected_dbkw(database_name="foo")), diff --git a/zodburi/tests/test_datatypes.py b/zodburi/tests/test_datatypes.py index 3862721..289d33e 100644 --- a/zodburi/tests/test_datatypes.py +++ b/zodburi/tests/test_datatypes.py @@ -1,102 +1,126 @@ import unittest +import pytest + _marker = object() -class SuffixMultiplierTests(unittest.TestCase): - def _getTargetClass(self): - from zodburi.datatypes import SuffixMultiplier - return SuffixMultiplier - def _makeOne(self, d=None, default=_marker): - if d is None: - d = {} - if default is _marker: - return self._getTargetClass()(d) - return self._getTargetClass()(d, default) +def _suffix_multiplier(d=None, default=_marker): + from zodburi.datatypes import SuffixMultiplier + + if d is None: + d = {} + + if default is _marker: + return SuffixMultiplier(d) + + return SuffixMultiplier(d, default) + + +def test_suffixmultiplier___init___w_defaults(): + sm = _suffix_multiplier() + assert sm._d == {} + assert sm._default == 1 + assert sm._keysz == 0 + + +def test_suffixmultiplier___init___w_explicit_default(): + sm = _suffix_multiplier(default=3) + assert sm._d == {} + assert sm._default == 3 + assert sm._keysz == 0 + + +def test_suffixmultiplier___init___w_normal_suffixes(): + SFX = {"aaa": 2, "bbb": 3} + sm = _suffix_multiplier(SFX) + assert sm._d == SFX + assert sm._default == 1 + assert sm._keysz == 3 + + +def test_suffixmultiplier___init___w_mismatched_suffixes(): + SFX = {"aaa": 2, "bbbb": 3} + + with pytest.raises(ValueError): + _suffix_multiplier(SFX) + + +def test_suffixmultiplier___call____miss(): + SFX = {"aaa": 2, "bbb": 3} + sm = _suffix_multiplier(SFX) + assert sm("14") == 14 + + +def test_suffixmultiplier___call___hit(): + SFX = {"aaa": 2, "bbb": 3} + sm = _suffix_multiplier(SFX) + assert sm("14bbb") == 42 + + +def test_convert_bytesize_miss(): + from zodburi.datatypes import convert_bytesize + + assert convert_bytesize("14") == 14 - def test_ctor_simple(self): - sm = self._makeOne() - self.assertEqual(sm._d, {}) - self.assertEqual(sm._default, 1) - self.assertEqual(sm._keysz, None) - def test_ctor_w_explicit_default(self): - sm = self._makeOne(default=3) - self.assertEqual(sm._default, 3) +@pytest.mark.parametrize("sized, expected", [ + ("14", 14), + ("200", 200), + ("14kb", 14 * 1024), + ("14mb", 14 * 1024 * 1024), + ("14gb", 14 * 1024 * 1024 * 1024), +]) +def test_convert_bytesize_hit(sized, expected): + from zodburi.datatypes import convert_bytesize - def test_ctor_w_normal_suffixes(self): - SFX = {'aaa': 2, 'bbb': 3} - sm = self._makeOne(SFX) - self.assertEqual(sm._d, SFX) - self.assertEqual(sm._default, 1) - self.assertEqual(sm._keysz, 3) + assert convert_bytesize(sized) == expected - def test_ctor_w_mismatched_suffixes(self): - SFX = {'aaa': 2, 'bbbb': 3} - self.assertRaises(ValueError, self._makeOne, SFX) - def test___call____miss(self): - SFX = {'aaa': 2, 'bbb': 3} - sm = self._makeOne(SFX) - self.assertEqual(sm('14'), 14) +def test_convert_int_w_falsetypes(): + from zodburi.datatypes import convert_int + from zodburi.datatypes import FALSETYPES - def test___call____hit(self): - SFX = {'aaa': 2, 'bbb': 3} - sm = self._makeOne(SFX) - self.assertEqual(sm('14bbb'), 42) + for v in FALSETYPES: + assert convert_int(v) == 0 + assert convert_int(v.title()) == 0 -class Test_convert_bytesize(unittest.TestCase): +def test_convert_int_w_truetypes(): + from zodburi.datatypes import convert_int + from zodburi.datatypes import TRUETYPES - def _callFUT(self, value): - from zodburi.datatypes import convert_bytesize - return convert_bytesize(value) + for v in TRUETYPES: + assert convert_int(v) == 1 + assert convert_int(v.title()) == 1 - def test_hit(self): - self.assertEqual(self._callFUT('14kb'), 14 * 1024) - self.assertEqual(self._callFUT('14mb'), 14 * 1024 * 1024) - self.assertEqual(self._callFUT('14gb'), 14 * 1024 * 1024 * 1024) - def test_miss(self): - self.assertEqual(self._callFUT('14'), 14) +def test_convert_int_w_normal(): + from zodburi.datatypes import convert_int + assert convert_int("14") == 14 -class Test_convert_int(unittest.TestCase): - def _callFUT(self, value): - from zodburi.datatypes import convert_int - return convert_int(value) +def test_convert_int_w_invalid(): + from zodburi.datatypes import convert_int - def test_hit_falsetypes(self): - from zodburi.datatypes import FALSETYPES - for v in FALSETYPES: - self.assertEqual(self._callFUT(v), 0) - self.assertEqual(self._callFUT(v.title()), 0) + with pytest.raises(ValueError): + convert_int("notanint") - def test_hit_truetypes(self): - from zodburi.datatypes import TRUETYPES - for v in TRUETYPES: - self.assertEqual(self._callFUT(v), 1) - self.assertEqual(self._callFUT(v.title()), 1) - def test_hit_normal(self): - self.assertEqual(self._callFUT('14'), 14) +def test_convert_tuple_w_empty(): + from zodburi.datatypes import convert_tuple - def test_miss(self): - self.assertRaises(ValueError, self._callFUT, 'notanint') + assert convert_tuple("") == ("",) -class Test_convert_tuple(unittest.TestCase): +def test_convert_tuple_wo_commas(): + from zodburi.datatypes import convert_tuple - def _callFUT(self, value): - from zodburi.datatypes import convert_tuple - return convert_tuple(value) + assert convert_tuple("abc") == ("abc",) - def test_empty(self): - self.assertEqual(self._callFUT(''), ('',)) - def test_wo_commas(self): - self.assertEqual(self._callFUT('abc'), ('abc',)) +def test_convert_tuple_w_commas(): + from zodburi.datatypes import convert_tuple - def test_w_commas(self): - self.assertEqual(self._callFUT('abc,def'), ('abc', 'def')) + assert convert_tuple("abc,def") == ("abc", "def") diff --git a/zodburi/tests/test_resolvers.py b/zodburi/tests/test_resolvers.py index 6c4fff2..d417030 100644 --- a/zodburi/tests/test_resolvers.py +++ b/zodburi/tests/test_resolvers.py @@ -1,682 +1,762 @@ +import contextlib from importlib.metadata import distribution +import os +import pathlib +import tempfile from unittest import mock from urllib.parse import quote import unittest import warnings import pytest +from ZEO.ClientStorage import ClientStorage +from ZODB.blob import BlobStorage +from ZODB.DemoStorage import DemoStorage +from ZODB.FileStorage import FileStorage +from ZODB.MappingStorage import MappingStorage -class Base: - - def test_interpret_kwargs_noargs(self): - resolver = self._makeOne() - kwargs = resolver.interpret_kwargs({}) - self.assertEqual(kwargs, ({}, {})) - - def test_bytesize_args(self): - resolver = self._makeOne() - names = sorted(resolver._bytesize_args) - kwargs = {} - for name in names: - kwargs[name] = '10MB' - args = resolver.interpret_kwargs(kwargs)[0] - keys = args.keys() - self.assertEqual(sorted(keys), names) - for name, value in args.items(): - self.assertEqual(value, 10*1024*1024) - - def test_int_args(self): - resolver = self._makeOne() - names = sorted(resolver._int_args) - kwargs = {} - for name in names: - kwargs[name] = '10' - args = resolver.interpret_kwargs(kwargs)[0] - keys = sorted(args.keys()) - self.assertEqual(sorted(keys), sorted(names)) - for name, value in args.items(): - self.assertEqual(value, 10) - - def test_string_args(self): - resolver = self._makeOne() - names = sorted(resolver._string_args) - kwargs = {} - for name in names: - kwargs[name] = 'string' - args = resolver.interpret_kwargs(kwargs)[0] - keys = args.keys() - self.assertEqual(sorted(keys), names) - for name, value in args.items(): - self.assertEqual(value, 'string') - - def test_float_args(self): - resolver = self._makeOne() - resolver._float_args = ('pi', 'PI') - names = sorted(resolver._float_args) - kwargs = {} - for name in names: - kwargs[name] = '3.14' - args = resolver.interpret_kwargs(kwargs)[0] - keys = args.keys() - self.assertEqual(sorted(keys), names) - for name, value in args.items(): - self.assertEqual(value, 3.14) - - def test_tuple_args(self): - resolver = self._makeOne() - resolver._tuple_args = ('foo', 'bar') - names = sorted(resolver._tuple_args) - kwargs = {} - for name in names: - kwargs[name] = 'first,second,third' - args = resolver.interpret_kwargs(kwargs)[0] - keys = args.keys() - self.assertEqual(sorted(keys), names) - for name, value in args.items(): - self.assertEqual(value, ('first', 'second', 'third')) - -class TestFileStorageURIResolver(Base, unittest.TestCase): - - def _getTargetClass(self): - from zodburi.resolvers import FileStorageURIResolver - return FileStorageURIResolver - - def _makeOne(self): - klass = self._getTargetClass() - return klass() - - def setUp(self): - import tempfile - self.tmpdir = tempfile.mkdtemp() - - def tearDown(self): - import shutil - shutil.rmtree(self.tmpdir) - - def test_bool_args(self): - resolver = self._makeOne() - f = resolver.interpret_kwargs - kwargs = f({'read_only':'1'}) - self.assertEqual(kwargs[0], {'read_only':1}) - kwargs = f({'read_only':'true'}) - self.assertEqual(kwargs[0], {'read_only':1}) - kwargs = f({'read_only':'on'}) - self.assertEqual(kwargs[0], {'read_only':1}) - kwargs = f({'read_only':'off'}) - self.assertEqual(kwargs[0], {'read_only':0}) - kwargs = f({'read_only':'no'}) - self.assertEqual(kwargs[0], {'read_only':0}) - kwargs = f({'read_only':'false'}) - self.assertEqual(kwargs[0], {'read_only':0}) - - @mock.patch('zodburi.resolvers.FileStorage') - def test_call_no_qs(self, FileStorage): - resolver = self._makeOne() - factory, dbkw = resolver('file:///tmp/foo/bar') - factory() - FileStorage.assert_called_once_with('/tmp/foo/bar') +FS_FILENAME = "db.db" +FS_BLOBDIR = "blob" - @mock.patch('zodburi.resolvers.FileStorage') - def test_call_abspath(self, FileStorage): - resolver = self._makeOne() - factory, dbkw = resolver('file:///tmp/foo/bar?read_only=true') - factory() - FileStorage.assert_called_once_with('/tmp/foo/bar', read_only=1) - @mock.patch('zodburi.resolvers.FileStorage') - def test_call_abspath_windows(self, FileStorage): - resolver = self._makeOne() - factory, dbkw = resolver( - 'file://C:\\foo\\bar?read_only=true') - factory() - FileStorage.assert_called_once_with('C:\\foo\\bar', read_only=1) +@pytest.fixture(scope="function") +def tmpdir(): + with tempfile.TemporaryDirectory() as tmp: + yield tmp + + +@pytest.fixture(scope="function") +def zconfig_tmpfile(): + with tempfile.NamedTemporaryFile() as tmp: + yield tmp + + +@pytest.fixture(scope="function") +def zconfig_path(zconfig_tmpfile): + yield pathlib.Path(zconfig_tmpfile.name) + + +def _fs_resolver(): + from zodburi.resolvers import FileStorageURIResolver + return FileStorageURIResolver() + + +def _mapping_resolver(): + from zodburi.resolvers import MappingStorageURIResolver + return MappingStorageURIResolver() + + +def _client_resolver(): + from zodburi.resolvers import ClientStorageURIResolver + return ClientStorageURIResolver() + + +def _zconfig_resolver(): + from zodburi.resolvers import ZConfigURIResolver + return ZConfigURIResolver() + +def _demo_resolver(): + from zodburi.resolvers import DemoStorageURIResolver + return DemoStorageURIResolver() + + +@pytest.mark.parametrize("factory", [_fs_resolver, _mapping_resolver]) +def test_interpret_kwargs_noargs(factory): + resolver = factory() + + new, unused = resolver.interpret_kwargs({}) + + assert new == {} + assert unused == {} + + +@pytest.mark.parametrize("factory", [_fs_resolver, _mapping_resolver]) +def test_interpret_kwargs_bytesize_args(factory): + resolver = factory() + names = sorted(resolver._bytesize_args) + kwargs = { + name: "10MB" for name in names + } + + new, unused = resolver.interpret_kwargs(kwargs) + + assert sorted(new) == names + + if new: + assert set(new.values()) == {10 * 1024 * 1024} + + +@pytest.mark.parametrize("factory", [_fs_resolver, _mapping_resolver]) +def test_interpret_kwargs_int_args(factory): + resolver = factory() + names = sorted(resolver._int_args) + kwargs = { + name: "10" for name in names + } + + new, unused = resolver.interpret_kwargs(kwargs) - @mock.patch('zodburi.resolvers.FileStorage') - def test_call_normpath(self, FileStorage): - resolver = self._makeOne() - factory, dbkw = resolver('file:///tmp/../foo/bar?read_only=true') + assert sorted(new) == sorted(new) + + if new: + assert set(new.values()) == {10} + + +@pytest.mark.parametrize("factory", [_fs_resolver, _mapping_resolver]) +def test_interpret_kwargs_string_args(factory): + resolver = factory() + names = sorted(resolver._string_args) + kwargs = { + name: "string" for name in names + } + + new, unused = resolver.interpret_kwargs(kwargs) + + assert sorted(new) == names + + if new: + assert set(new.values()) == {"string"} + + +@pytest.mark.parametrize("factory", [_fs_resolver, _mapping_resolver]) +def test_interpret_kwargs_float_args(factory): + resolver = factory() + resolver._float_args = ("pi", "PI") + names = sorted(resolver._float_args) + kwargs = { + "pi": "3.14", + "PI": "3.14", + } + + new, unussed = resolver.interpret_kwargs(kwargs) + + assert sorted(new) == names + + if new: + assert set(new.values()) == {3.14} + + +@pytest.mark.parametrize("factory", [_fs_resolver, _mapping_resolver]) +def test_interpret_kwargs_tuple_args(factory): + resolver = factory() + resolver._tuple_args = ("foo", "bar") + names = sorted(resolver._tuple_args) + kwargs = { + name: "first,second,third" for name in names + } + + new, unused = resolver.interpret_kwargs(kwargs) + + assert sorted(new) == names + + if new: + assert set(new.values()) == {("first", "second", "third")} + + +@pytest.mark.parametrize("passed, expected", [ + ("1", 1), + ("0", 0), + ("true", 1), + ("false", 0), + ("on", 1), + ("off", 0), + ("yes", 1), + ("no", 0), +]) +@pytest.mark.parametrize("factory", [ + _fs_resolver, + _client_resolver, +]) +def test_fsresolver_interpres_kwargs_bool_args(factory, passed, expected): + resolver = factory() + kwargs = {"read_only": passed} + + new, unused = resolver.interpret_kwargs(kwargs) + + assert new == {"read_only": expected} + + +@pytest.mark.parametrize("uri, expected_args, expected_kwargs", [ + ("file:///tmp/foo/bar", ("/tmp/foo/bar",), {}), + ( + "file:///tmp/foo/bar?read_only=true", + ("/tmp/foo/bar",), + {"read_only": 1} + ), + ( + "file://C:\\foo\\bar?read_only=true", + ("C:\\foo\\bar",), + {"read_only": 1} + ), + ( + "file:///tmp/../foo/bar?read_only=true", + ("/foo/bar",), + {"read_only": 1} + ), +]) +def test_fsresolver___call___mock_invoke_factory( + uri, expected_args, expected_kwargs, +): + resolver = _fs_resolver() + + factory, dbkw = resolver(uri) + assert dbkw == {} + + with mock.patch("zodburi.resolvers.FileStorage") as fs_klass: factory() - FileStorage.assert_called_once_with('/foo/bar', read_only=1) - - def test_invoke_factory_filestorage(self): - import os - from ZODB.FileStorage import FileStorage - self.assertFalse(os.path.exists(os.path.join(self.tmpdir, 'db.db'))) - resolver = self._makeOne() - factory, dbkw = resolver('file://%s/db.db?quota=200' % self.tmpdir) - storage = factory() - self.assertTrue(isinstance(storage, FileStorage)) - try: - self.assertEqual(storage._quota, 200) - self.assertTrue(os.path.exists(os.path.join(self.tmpdir, 'db.db'))) - finally: - storage.close() - - def test_invoke_factory_demostorage(self): - import os - from ZODB.DemoStorage import DemoStorage - from ZODB.FileStorage import FileStorage - resolver = self._makeOne() - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - factory, dbkw = resolver( - 'file://%s/db.db?demostorage=true' % self.tmpdir) - self.assertEqual(len(w), 1) - self.assertTrue(issubclass(w[-1].category, DeprecationWarning)) - self.assertIn("demostorage option is deprecated, use demo:// instead", str(w[-1].message)) - storage = factory() - self.assertTrue(isinstance(storage, DemoStorage)) - self.assertTrue(isinstance(storage.base, FileStorage)) - try: - self.assertEqual(dbkw, {}) - self.assertTrue(os.path.exists(os.path.join(self.tmpdir, 'db.db'))) - finally: - storage.close() - - def test_invoke_factory_blobstorage(self): - import os - from ZODB.blob import BlobStorage - DB_FILE = os.path.join(self.tmpdir, 'db.db') - BLOB_DIR = os.path.join(self.tmpdir, 'blob') - self.assertFalse(os.path.exists(DB_FILE)) - resolver = self._makeOne() + + fs_klass.assert_called_once_with(*expected_args, **expected_kwargs) + + +def test_fsresolver___call___check_dbkw(): + resolver = _fs_resolver() + factory, dbkw = resolver( + "file:///tmp/foo/bar" + "?connection_pool_size=1" + "&connection_cache_size=1" + "&database_name=dbname" + ) + + assert dbkw == { + "connection_cache_size": "1", + "connection_pool_size": "1", + "database_name": "dbname", + } + + +def test_fsresolver_invoke_factory(tmpdir): + fs_dir = pathlib.Path(tmpdir) + db_path = fs_dir / FS_FILENAME + resolver = _fs_resolver() + + factory, dbkw = resolver(f"file://{tmpdir}/{FS_FILENAME}?quota=200") + + assert dbkw == {} + + with contextlib.closing(factory()) as storage: + assert isinstance(storage, FileStorage) + assert storage._quota == 200 + assert db_path.exists() + + +def test_fsresolver_invoke_factory_w_demostorage(tmpdir): + fs_dir = pathlib.Path(tmpdir) + db_path = fs_dir / FS_FILENAME + resolver = _fs_resolver() + + with warnings.catch_warnings(record=True) as log: factory, dbkw = resolver( - 'file://%s/db.db?quota=200' - '&blobstorage_dir=%s/blob' - '&blobstorage_layout=bushy' % (self.tmpdir, quote(self.tmpdir))) - storage = factory() - self.assertTrue(isinstance(storage, BlobStorage)) - try: - self.assertTrue(os.path.exists(DB_FILE)) - self.assertTrue(os.path.exists(BLOB_DIR)) - finally: - storage.close() - - def test_invoke_factory_blobstorage_and_demostorage(self): - import os - from ZODB.DemoStorage import DemoStorage - DB_FILE = os.path.join(self.tmpdir, 'db.db') - BLOB_DIR = os.path.join(self.tmpdir, 'blob') - self.assertFalse(os.path.exists(DB_FILE)) - resolver = self._makeOne() - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - factory, dbkw = resolver( - 'file://%s/db.db?quota=200&demostorage=true' - '&blobstorage_dir=%s/blob' - '&blobstorage_layout=bushy' % ( - self.tmpdir, quote( - self.tmpdir - ))) - self.assertEqual(len(w), 1) - self.assertTrue(issubclass(w[-1].category, DeprecationWarning)) - self.assertIn("demostorage option is deprecated, use demo:// instead", str(w[-1].message)) - storage = factory() - self.assertTrue(isinstance(storage, DemoStorage)) - try: - self.assertTrue(os.path.exists(DB_FILE)) - self.assertTrue(os.path.exists(BLOB_DIR)) - finally: - storage.close() - - def test_dbargs(self): - resolver = self._makeOne() + f"file://{tmpdir}/{FS_FILENAME}?demostorage=true", + ) + + assert dbkw == {} + + warned, = log + assert issubclass(warned.category, DeprecationWarning) + assert ( + "demostorage option is deprecated, use demo:// instead" + in str(warned.message) + ) + + with contextlib.closing(factory()) as storage: + assert isinstance(storage, DemoStorage) + assert isinstance(storage.base, FileStorage) + assert db_path.exists() + + +def test_fsresolver_invoke_factory_w_blobstorage(tmpdir): + fs_dir = pathlib.Path(tmpdir) + db_path = fs_dir / FS_FILENAME + blob_dir = fs_dir / FS_BLOBDIR + quoted = quote(str(blob_dir)) + resolver = _fs_resolver() + + factory, dbkw = resolver( + f"file://{tmpdir}/{FS_FILENAME}?quota=200" + f"&blobstorage_dir={quoted}&blobstorage_layout=bushy" + ) + + assert dbkw == {} + + with contextlib.closing(factory()) as storage: + assert isinstance(storage, BlobStorage) + assert blob_dir.exists() + assert db_path.exists() + + +def test_fsresolver_invoke_factory_w_blobstorage_and_demostorage(tmpdir): + fs_dir = pathlib.Path(tmpdir) + db_path = fs_dir / FS_FILENAME + blob_dir = fs_dir / FS_BLOBDIR + quoted = quote(str(blob_dir)) + resolver = _fs_resolver() + + with warnings.catch_warnings(record=True) as log: factory, dbkw = resolver( - 'file:///tmp/../foo/bar?connection_pool_size=1' - '&connection_cache_size=1&database_name=dbname') - self.assertEqual(dbkw, {'connection_cache_size': '1', - 'connection_pool_size': '1', - 'database_name': 'dbname'}) - - -class TestClientStorageURIResolver(unittest.TestCase): - def _getTargetClass(self): - from zodburi.resolvers import ClientStorageURIResolver - return ClientStorageURIResolver - - def _makeOne(self): - klass = self._getTargetClass() - return klass() - - def test_bool_args(self): - resolver = self._makeOne() - f = resolver.interpret_kwargs - kwargs = f({'read_only':'1'}) - self.assertEqual(kwargs[0], {'read_only':1}) - kwargs = f({'read_only':'true'}) - self.assertEqual(kwargs[0], {'read_only':1}) - kwargs = f({'read_only':'on'}) - self.assertEqual(kwargs[0], {'read_only':1}) - kwargs = f({'read_only':'off'}) - self.assertEqual(kwargs[0], {'read_only':0}) - kwargs = f({'read_only':'no'}) - self.assertEqual(kwargs[0], {'read_only':0}) - kwargs = f({'read_only':'false'}) - self.assertEqual(kwargs[0], {'read_only':0}) - - @mock.patch('zodburi.resolvers.ClientStorage') - def test_call_tcp_no_port(self, ClientStorage): - resolver = self._makeOne() - factory, dbkw = resolver('zeo://localhost?debug=true') + f"file://{tmpdir}/{FS_FILENAME}?quota=200&demostorage=true" + f"&blobstorage_dir={quoted}&blobstorage_layout=bushy" + ) + + assert dbkw == {} + + warned, = log + assert issubclass(warned.category, DeprecationWarning) + assert ( + "demostorage option is deprecated, use demo:// instead" + in str(warned.message) + ) + + with contextlib.closing(factory()) as storage: + assert isinstance(storage, DemoStorage) + assert isinstance(storage.base, BlobStorage) + assert blob_dir.exists() + assert db_path.exists() + + +@pytest.mark.parametrize("uri, expected_args, expected_kwargs", [ + ("zeo://localhost", (("localhost", 9991),), {}), + ("zeo://localhost:8080?debug=true", (("localhost", 8080),), {"debug": 1}), + ("zeo://[::1]", (("::1", 9991),), {}), + ("zeo://[::1]:9990?debug=true", (("::1", 9990),), {"debug": 1}), + ("zeo:///var/sock?debug=true", ("/var/sock",), {"debug": 1}), + ("zeo:///var/nosuchfile?wait=false", ("/var/nosuchfile",), {"wait": 0}), + ( + ( + 'zeo:///var/nosuchfile?' + 'storage=main&' + 'cache_size=1kb&' + 'name=foo&' + 'client=bar&' + 'var=baz&' + 'min_disconnect_poll=2&' + 'max_disconnect_poll=3&' + 'wait_for_server_on_startup=true&' + 'wait=4&' + 'wait_timeout=5&' + 'read_only=6&' + 'read_only_fallback=7&' + 'drop_cache_rather_verify=true&' + 'username=monty&' + 'password=python&' + 'realm=blat&' + 'blob_dir=some/dir&' + 'shared_blob_dir=true&' + 'blob_cache_size=1kb&' + 'blob_cache_size_check=8&' + 'client_label=fink&' + ), + ('/var/nosuchfile',), + dict( + storage='main', + cache_size=1024, + name='foo', + client='bar', + var='baz', + min_disconnect_poll=2, + max_disconnect_poll=3, + wait_for_server_on_startup=1, + wait=4, + wait_timeout=5, + read_only=6, + read_only_fallback=7, + drop_cache_rather_verify=1, + username='monty', + password='python', + realm='blat', + blob_dir='some/dir', + shared_blob_dir=1, + blob_cache_size=1024, + blob_cache_size_check=8, + client_label='fink', + ), + ), +]) +def test_client_resolver___call___(uri, expected_args, expected_kwargs): + resolver = _client_resolver() + + factory, dbkw = resolver(uri) + + assert dbkw == {} + + with mock.patch("zodburi.resolvers.ClientStorage") as cs: factory() - ClientStorage.assert_called_once_with(('localhost', 9991), debug=1) - @mock.patch('zodburi.resolvers.ClientStorage') - def test_call_tcp(self, ClientStorage): - resolver = self._makeOne() - factory, dbkw = resolver('zeo://localhost:8080?debug=true') - factory() - ClientStorage.assert_called_once_with(('localhost', 8080), debug=1) + cs.assert_called_once_with(*expected_args, **expected_kwargs) - @mock.patch('zodburi.resolvers.ClientStorage') - def test_call_ipv6_no_port(self, ClientStorage): - resolver = self._makeOne() - factory, dbkw = resolver('zeo://[::1]?debug=true') - factory() - ClientStorage.assert_called_once_with(('::1', 9991), debug=1) +def test_client_resolver___call___check_dbkw(): + resolver = _client_resolver() - @mock.patch('zodburi.resolvers.ClientStorage') - def test_call_ipv6(self, ClientStorage): - resolver = self._makeOne() - factory, dbkw = resolver('zeo://[::1]:9090?debug=true') - factory() - ClientStorage.assert_called_once_with(('::1', 9090), debug=1) + factory, dbkw = resolver( + "zeo://localhost:8080?" + "connection_pool_size=1&" + "connection_cache_size=1&" + "database_name=dbname" + ) + assert dbkw == { + "connection_pool_size": "1", + "connection_cache_size": "1", + "database_name": "dbname", + } - @mock.patch('zodburi.resolvers.ClientStorage') - def test_call_unix(self, ClientStorage): - resolver = self._makeOne() - factory, dbkw = resolver('zeo:///var/sock?debug=true') - factory() - ClientStorage.assert_called_once_with('/var/sock', debug=1) - - @mock.patch('zodburi.resolvers.ClientStorage') - def test_invoke_factory(self, ClientStorage): - resolver = self._makeOne() - factory, dbkw = resolver('zeo:///var/nosuchfile?wait=false') - storage = factory() - storage.close() - ClientStorage.assert_called_once_with('/var/nosuchfile', wait=0) - - @mock.patch('zodburi.resolvers.ClientStorage') - def test_factory_kwargs(self, ClientStorage): - resolver = self._makeOne() - factory, dbkw = resolver('zeo:///var/nosuchfile?' - 'storage=main&' - 'cache_size=1kb&' - 'name=foo&' - 'client=bar&' - 'var=baz&' - 'min_disconnect_poll=2&' - 'max_disconnect_poll=3&' - 'wait_for_server_on_startup=true&' - 'wait=4&' - 'wait_timeout=5&' - 'read_only=6&' - 'read_only_fallback=7&' - 'drop_cache_rather_verify=true&' - 'username=monty&' - 'password=python&' - 'realm=blat&' - 'blob_dir=some/dir&' - 'shared_blob_dir=true&' - 'blob_cache_size=1kb&' - 'blob_cache_size_check=8&' - 'client_label=fink&' - ) - storage = factory() - storage.close() - ClientStorage.assert_called_once_with('/var/nosuchfile', - storage='main', - cache_size=1024, - name='foo', - client='bar', - var='baz', - min_disconnect_poll=2, - max_disconnect_poll=3, - wait_for_server_on_startup=1, - wait=4, - wait_timeout=5, - read_only=6, - read_only_fallback=7, - drop_cache_rather_verify=1, - username='monty', - password='python', - realm='blat', - blob_dir='some/dir', - shared_blob_dir=1, - blob_cache_size=1024, - blob_cache_size_check=8, - client_label='fink', - ) - - - @mock.patch('zodburi.resolvers.ClientStorage') - def test_invoke_factory_demostorage(self, ClientStorage): - from ZODB.DemoStorage import DemoStorage - resolver = self._makeOne() - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - factory, dbkw = resolver('zeo:///var/nosuchfile?wait=false' - '&demostorage=true') - self.assertEqual(len(w), 1) - self.assertTrue(issubclass(w[-1].category, DeprecationWarning)) - self.assertIn("demostorage option is deprecated, use demo:// instead", str(w[-1].message)) - storage = factory() - storage.close() - self.assertTrue(isinstance(storage, DemoStorage)) - - def test_dbargs(self): - resolver = self._makeOne() - factory, dbkw = resolver('zeo://localhost:8080?debug=true&' - 'connection_pool_size=1&' - 'connection_cache_size=1&' - 'database_name=dbname') - self.assertEqual(dbkw, {'connection_pool_size': '1', - 'connection_cache_size': '1', - 'database_name': 'dbname'}) - - -class TestZConfigURIResolver(unittest.TestCase): - def _getTargetClass(self): - from zodburi.resolvers import ZConfigURIResolver - return ZConfigURIResolver - - def _makeOne(self): - klass = self._getTargetClass() - return klass() - - def setUp(self): - import tempfile - self.tmp = tempfile.NamedTemporaryFile() - - def tearDown(self): - self.tmp.close() - - def test_named_storage(self): - self.tmp.write(b""" - - - - - - """) - self.tmp.flush() - resolver = self._makeOne() - factory, dbkw = resolver('zconfig://%s#bar' % self.tmp.name) - storage = factory() - from ZODB.MappingStorage import MappingStorage - self.assertTrue(isinstance(storage, MappingStorage), storage) - - def test_anonymous_storage(self): - self.tmp.write(b""" - - - - - - """) - self.tmp.flush() - resolver = self._makeOne() - factory, dbkw = resolver('zconfig://%s' % self.tmp.name) - storage = factory() - from ZODB.MappingStorage import MappingStorage - self.assertTrue(isinstance(storage, MappingStorage)) - self.assertEqual(dbkw, {}) - - def test_query_string_args(self): - self.tmp.write(b""" - - - - - - """) - self.tmp.flush() - resolver = self._makeOne() - factory, dbkw = resolver('zconfig://%s?foo=bar' % self.tmp.name) - self.assertEqual(dbkw, {'foo': 'bar'}) - - def test_storage_not_found(self): - self.tmp.write(b""" - - - """) - self.tmp.flush() - resolver = self._makeOne() - self.assertRaises(KeyError, resolver, 'zconfig://%s#y' % self.tmp.name) - - def test_anonymous_database(self): - self.tmp.write(b""" - - - - - """) - self.tmp.flush() - resolver = self._makeOne() - factory, dbkw = resolver('zconfig://%s' % self.tmp.name) - storage = factory() - from ZODB.MappingStorage import MappingStorage - self.assertTrue(isinstance(storage, MappingStorage)) - self.assertEqual(dbkw, - {'connection_cache_size': 5000, - 'connection_cache_size_bytes': 0, - 'connection_historical_cache_size': 1000, - 'connection_historical_cache_size_bytes': 0, - 'connection_historical_pool_size': 3, - 'connection_historical_timeout': 300, - 'connection_large_record_size': 16777216, - 'connection_pool_size': 7}) - - - def test_named_database(self): - self.tmp.write(b""" - - - - database-name foo - cache-size 20000 - pool-size 5 - - """) - self.tmp.flush() - resolver = self._makeOne() - factory, dbkw = resolver('zconfig://%s#x' % self.tmp.name) - storage = factory() - from ZODB.MappingStorage import MappingStorage - self.assertTrue(isinstance(storage, MappingStorage)) - self.assertEqual(dbkw, - {'connection_cache_size': 20000, - 'connection_cache_size_bytes': 0, - 'connection_historical_cache_size': 1000, - 'connection_historical_cache_size_bytes': 0, - 'connection_historical_pool_size': 3, - 'connection_historical_timeout': 300, - 'connection_large_record_size': 16777216, - 'connection_pool_size': 5, - 'database_name': 'foo'}) - - - def test_database_all_options(self): - from zodburi import CONNECTION_PARAMETERS, BYTES_PARAMETERS - self.tmp.write((""" - - - - database-name foo - %s - - """ % '\n'.join("{} {}".format( - name.replace('_', '-'), - '%sMB' % i if name in BYTES_PARAMETERS else i, - ) - for (i, name) - in enumerate(CONNECTION_PARAMETERS) - )).encode()) - self.tmp.flush() - resolver = self._makeOne() - factory, dbkw = resolver('zconfig://%s#x' % self.tmp.name) - storage = factory() - from ZODB.MappingStorage import MappingStorage - self.assertTrue(isinstance(storage, MappingStorage)) - expect = dict(database_name='foo') - for i, parameter in enumerate(CONNECTION_PARAMETERS): - cparameter = 'connection_' + parameter - expect[cparameter] = i - if parameter in BYTES_PARAMETERS: - expect[cparameter] *= 1<<20 - self.assertEqual(dbkw, expect) - - def test_database_integration_because_ints(self): - from zodburi import resolve_uri - self.tmp.write(b""" - - - - - """) - self.tmp.flush() - from zodburi import resolve_uri - factory, dbkw = resolve_uri('zconfig://%s' % self.tmp.name) - storage = factory() - from ZODB.MappingStorage import MappingStorage - self.assertTrue(isinstance(storage, MappingStorage)) - self.assertEqual(dbkw, - {'cache_size': 5000, - 'cache_size_bytes': 0, - 'historical_cache_size': 1000, - 'historical_cache_size_bytes': 0, - 'historical_pool_size': 3, - 'historical_timeout': 300, - 'large_record_size': 16777216, - 'pool_size': 7, - 'database_name': 'unnamed'}) - - -class TestMappingStorageURIResolver(Base, unittest.TestCase): - - def _getTargetClass(self): - from zodburi.resolvers import MappingStorageURIResolver - return MappingStorageURIResolver - - def _makeOne(self): - klass = self._getTargetClass() - return klass() - - def test_call_no_qs(self): - resolver = self._makeOne() - factory, dbkw = resolver('memory://') - self.assertEqual(dbkw, {}) - storage = factory() - from ZODB.MappingStorage import MappingStorage - self.assertTrue(isinstance(storage, MappingStorage)) - self.assertEqual(storage.__name__, '') - - def test_call_with_qs(self): - uri='memory://storagename?connection_cache_size=100&database_name=fleeb' - resolver = self._makeOne() - factory, dbkw = resolver(uri) - self.assertEqual(dbkw, {'connection_cache_size': '100', - 'database_name': 'fleeb'}) - storage = factory() - from ZODB.MappingStorage import MappingStorage - self.assertTrue(isinstance(storage, MappingStorage)) - self.assertEqual(storage.__name__, 'storagename') - - -class TestDemoStorageURIResolver(unittest.TestCase): - - def _getTargetClass(self): - from zodburi.resolvers import DemoStorageURIResolver - return DemoStorageURIResolver - - def _makeOne(self): - klass = self._getTargetClass() - return klass() - - def test_invalid_uri_no_match(self): - from zodburi.resolvers import InvalidDemoStorgeURI - resolver = self._makeOne() - - with pytest.raises(InvalidDemoStorgeURI): - resolver("bogus:name") - - def test_invalid_uri_kwargs_in_base(self): - from zodburi.resolvers import InvalidDemoStorgeURI - resolver = self._makeOne() - - with pytest.raises(InvalidDemoStorgeURI): - resolver( - "demo:" - "(file:///tmp/blah?pool_size=1234)/" - "(file:///tmp/qux)" - ) - - def test_invalid_uri_kwargs_in_changes(self): - from zodburi.resolvers import InvalidDemoStorgeURI - resolver = self._makeOne() - - with pytest.raises(InvalidDemoStorgeURI): - resolver( - "demo:" - "(file:///tmp/blah)/" - "(file:///tmp/qux?pool_size=1234)" - ) - - def test_fsoverlay(self): - import os.path, tempfile, shutil - tmpdir = tempfile.mkdtemp() - def _(): - shutil.rmtree(tmpdir) - self.addCleanup(_) - - resolver = self._makeOne() - basef = os.path.join(tmpdir, 'base.fs') - changef = os.path.join(tmpdir, 'changes.fs') - self.assertFalse(os.path.exists(basef)) - self.assertFalse(os.path.exists(changef)) - factory, dbkw = resolver('demo:(file://{})/(file://{}?quota=200)'.format(basef, changef)) - self.assertEqual(dbkw, {}) - demo = factory() - from ZODB.DemoStorage import DemoStorage - from ZODB.FileStorage import FileStorage - self.assertTrue(isinstance(demo, DemoStorage)) - self.assertTrue(isinstance(demo.base, FileStorage)) - self.assertTrue(isinstance(demo.changes, FileStorage)) - self.assertTrue(os.path.exists(basef)) - self.assertTrue(os.path.exists(changef)) - self.assertEqual(demo.changes._quota, 200) - - def test_parse_frag(self): - resolver = self._makeOne() - factory, dbkw = resolver('demo:(memory://111)/(memory://222)#foo=bar&abc=def') - self.assertEqual(dbkw, {'foo': 'bar', 'abc': 'def'}) - demo = factory() - from ZODB.DemoStorage import DemoStorage - from ZODB.MappingStorage import MappingStorage - self.assertTrue(isinstance(demo, DemoStorage)) - self.assertTrue(isinstance(demo.base, MappingStorage)) - self.assertEqual(demo.base.__name__, '111') - self.assertTrue(isinstance(demo.changes, MappingStorage)) - self.assertEqual(demo.changes.__name__, '222') - - -class TestEntryPoints(unittest.TestCase): - - def test_it(self): - from zodburi import resolvers - - our_eps = { - ep.name: ep for ep in distribution("zodburi").entry_points - } - expected = [ - ('memory', resolvers.MappingStorageURIResolver), - ('zeo', resolvers.ClientStorageURIResolver), - ('file', resolvers.FileStorageURIResolver), - ('zconfig', resolvers.ZConfigURIResolver), - ('demo', resolvers.DemoStorageURIResolver), - ] - for name, cls in expected: - target = our_eps[name].load() - self.assertTrue(isinstance(target, cls)) + +def test_client_resolver_invoke_factory(): + resolver = _client_resolver() + + factory, dbkw = resolver("zeo:///var/nosouchfile?wait=false") + assert dbkw == {} + + with contextlib.closing(factory()) as storage: + assert isinstance(storage, ClientStorage) + # Stub out the storage's server to allow close to complete. + storage._server = mock.Mock(spec_set=("close",)) + + +def test_client_resolver_invoke_factory_w_demostorage(): + resolver = _client_resolver() + + with warnings.catch_warnings(record=True) as log: + factory, dbkw = resolver( + f"zeo:///var/nosuchfile?demostorage=true&wait=false", + ) + + assert dbkw == {} + + warned, = log + assert issubclass(warned.category, DeprecationWarning) + assert ( + "demostorage option is deprecated, use demo:// instead" + in str(warned.message) + ) + + with contextlib.closing(factory()) as storage: + assert isinstance(storage, DemoStorage) + assert isinstance(storage.base, ClientStorage) + # Stub out the storage's server to allow close to complete. + storage.base._server = mock.Mock(spec_set=("close",)) + + +def test_zconfig_resolver___call___check_dbkw(zconfig_path): + zconfig_path.write_text( + """\ + + +""" + ) + resolver = _zconfig_resolver() + factory, dbkw = resolver(f"zconfig://{zconfig_path}?foo=bar") + + assert dbkw == {"foo": "bar"} + + +def test_zconfig_resolver___call___w_unknown_storage(zconfig_path): + zconfig_path.write_text( + """\ + + +""" + ) + resolver = _zconfig_resolver() + with pytest.raises(KeyError): + resolver(f"zconfig://{zconfig_path}#y") + + + +def test_zconfig_resolver_invoke_factory_w_named_storage(zconfig_path): + zconfig_path.write_text( + """\ + + + + + +""" + ) + resolver = _zconfig_resolver() + factory, dbkw = resolver(f"zconfig://{zconfig_path}#bar") + + assert dbkw == {} + + with contextlib.closing(factory()) as storage: + assert isinstance(storage, MappingStorage) + + +def test_zconfig_resolver_invoke_factory_w_anonymous_storage(zconfig_path): + zconfig_path.write_text( + """\ + + + + + +""" + ) + resolver = _zconfig_resolver() + factory, dbkw = resolver(f"zconfig://{zconfig_path}") + + assert dbkw == {} + + with contextlib.closing(factory()) as storage: + assert isinstance(storage, MappingStorage) + + +def test_zconfig_resolver_invoke_factory_w_anon_db_w_defaults(zconfig_path): + zconfig_path.write_text( + """\ + + + + +""" + ) + resolver = _zconfig_resolver() + factory, dbkw = resolver(f"zconfig://{zconfig_path}") + + assert dbkw == { + "connection_cache_size": 5000, + "connection_cache_size_bytes": 0, + "connection_historical_cache_size": 1000, + "connection_historical_cache_size_bytes": 0, + "connection_historical_pool_size": 3, + "connection_historical_timeout": 300, + "connection_large_record_size": 16777216, + "connection_pool_size": 7, + } + + with contextlib.closing(factory()) as storage: + assert isinstance(storage, MappingStorage) + + +def test_zconfig_resolver_invoke_factory_w_named_db_w_explicit(zconfig_path): + zconfig_path.write_text( + """\ + + + + + database-name foo + cache-size 20000 + pool-size 5 + +""" + ) + resolver = _zconfig_resolver() + factory, dbkw = resolver(f"zconfig://{zconfig_path}#x") + + assert dbkw == { + "database_name": "foo", + "connection_cache_size": 20000, + "connection_cache_size_bytes": 0, + "connection_historical_cache_size": 1000, + "connection_historical_cache_size_bytes": 0, + "connection_historical_pool_size": 3, + "connection_historical_timeout": 300, + "connection_large_record_size": 16777216, + "connection_pool_size": 5, + } + + with contextlib.closing(factory()) as storage: + assert isinstance(storage, MappingStorage) + + +def test_zconfig_resolver_invoke_factory_w_all_options(zconfig_path): + from zodburi import CONNECTION_PARAMETERS, BYTES_PARAMETERS + + all_params = [ + ( + name.replace("_", "-"), + "%sMB" % i if name in BYTES_PARAMETERS else str(i), + ) + for (i, name) in enumerate(CONNECTION_PARAMETERS) + ] + params_str = "\n".join( + [f"{name} {value}" for name, value in all_params] + ) + zconfig_path.write_text( + f"""\ + + + +database-name foo +{params_str} + +""" + ) + resolver = _zconfig_resolver() + factory, dbkw = resolver(f"zconfig://{zconfig_path}#x") + + expected = {"database_name": "foo"} + + for i, parameter in enumerate(CONNECTION_PARAMETERS): + cparameter = f"connection_{parameter}" + + if parameter in BYTES_PARAMETERS: + expected[cparameter] = i * 1024 * 1024 + else: + expected[cparameter] = i + + assert dbkw == expected + + with contextlib.closing(factory()) as storage: + assert isinstance(storage, MappingStorage) + + + +def test_resolve_uri_w_zconfig(zconfig_path): + from zodburi import resolve_uri + + zconfig_path.write_text("""\ + + + + +""" + ) + factory, dbkw = resolve_uri(f"zconfig://{zconfig_path}") + + assert dbkw == { + "cache_size": 5000, + "cache_size_bytes": 0, + "historical_cache_size": 1000, + "historical_cache_size_bytes": 0, + "historical_pool_size": 3, + "historical_timeout": 300, + "large_record_size": 16777216, + "pool_size": 7, + "database_name": "unnamed", + } + + with contextlib.closing(factory()) as storage: + assert isinstance(storage, MappingStorage) + + +def test_mapping_resolver_wo_qs(): + resolver = _mapping_resolver() + + factory, dbkw = resolver("memory://") + + assert dbkw == {} + + with contextlib.closing(factory()) as storage: + assert isinstance(storage, MappingStorage) + assert storage.__name__ == "" + + +def test_mapping_resolver_w_qs(): + resolver = _mapping_resolver() + + factory, dbkw = resolver( + "memory://storagename?connection_cache_size=100&database_name=fleeb" + ) + + assert dbkw == { + "connection_cache_size": "100", + "database_name": "fleeb" + } + + with contextlib.closing(factory()) as storage: + assert isinstance(storage, MappingStorage) + assert storage.__name__ == "storagename" + + +def test_demo_resolver_w_invalid_uri_no_match(): + from zodburi.resolvers import InvalidDemoStorgeURI + + resolver = _demo_resolver() + + with pytest.raises(InvalidDemoStorgeURI): + resolver("bogus:name") + + +def test_demo_resolver_w_invalid_uri_kwargs_in_base(): + from zodburi.resolvers import InvalidDemoStorgeURI + + resolver = _demo_resolver() + + with pytest.raises(InvalidDemoStorgeURI): + resolver( + "demo:" + "(file:///tmp/blah?pool_size=1234)/" + "(file:///tmp/qux)" + ) + + +def test_demo_resolver_w_invalid_uri_kwargs_in_changes(): + from zodburi.resolvers import InvalidDemoStorgeURI + + resolver = _demo_resolver() + + with pytest.raises(InvalidDemoStorgeURI): + resolver( + "demo:" + "(file:///tmp/blah)/" + "(file:///tmp/qux?pool_size=1234)" + ) + + +def test_demo_resolver_invoke_factory_w_fs_overlay(tmpdir): + fs_dir = pathlib.Path(tmpdir) + base_path = fs_dir / "base.fs" + changes_path = fs_dir / "changes.fs" + assert not base_path.exists() + assert not changes_path.exists() + demo_uri = ( + f"demo:" + f"(file://{base_path})/" + f"(file://{changes_path}" + f"?quota=200)" + ) + + resolver = _demo_resolver() + + factory, dbkw = resolver(demo_uri) + + assert dbkw == {} + + with contextlib.closing(factory()) as demo: + assert isinstance(demo, DemoStorage) + assert isinstance(demo.base, FileStorage) + assert isinstance(demo.changes, FileStorage) + assert base_path.exists() + assert changes_path.exists() + assert demo.changes._quota == 200 + + +def test_demo_resolver_invoke_factory_w_qs_parms(tmpdir): + fs_dir = pathlib.Path(tmpdir) + base_path = fs_dir / "base.fs" + changes_path = fs_dir / "changes.fs" + assert not base_path.exists() + assert not changes_path.exists() + demo_uri = "demo:(memory://111)/(memory://222)#foo=bar&abc=def" + + resolver = _demo_resolver() + + factory, dbkw = resolver(demo_uri) + + assert dbkw == {'foo': 'bar', 'abc': 'def'} + + with contextlib.closing(factory()) as demo: + assert isinstance(demo, DemoStorage) + assert isinstance(demo.base, MappingStorage) + demo.base.__name__ == '111' + assert isinstance(demo.changes, MappingStorage) + assert demo.changes.__name__ == '222' + + +def test_entry_points(): + from zodburi import resolvers + + our_eps = { + ep.name: ep for ep in distribution("zodburi").entry_points + } + expected = [ + ('memory', resolvers.MappingStorageURIResolver), + ('zeo', resolvers.ClientStorageURIResolver), + ('file', resolvers.FileStorageURIResolver), + ('zconfig', resolvers.ZConfigURIResolver), + ('demo', resolvers.DemoStorageURIResolver), + ] + for name, cls in expected: + target = our_eps[name].load() + assert isinstance(target, cls)