diff --git a/zodburi/__init__.py b/zodburi/__init__.py
index d58275a..d9c7a7c 100644
--- a/zodburi/__init__.py
+++ b/zodburi/__init__.py
@@ -14,37 +14,41 @@
)
BYTES_PARAMETERS = (
- 'cache_size_bytes',
- 'historical_cache_size_bytes',
- 'large_record_size'
+ "cache_size_bytes",
+ "historical_cache_size_bytes",
+ "large_record_size"
)
PARAMETERS = dict(
- [('database_name', 'database_name')] +
- [(f'connection_{parm}', parm) for parm in CONNECTION_PARAMETERS]
+ [("database_name", "database_name")] +
+ [(f"connection_{parm}", parm) for parm in CONNECTION_PARAMETERS]
)
-HAS_UNITS_RE = re.compile(r'\s*(\d+)\s*([kmg])b\s*$')
+HAS_UNITS_RE = re.compile(r"\s*(\d+)\s*([kmg])b\s*$")
UNITS = dict(k=1<<10, m=1<<20, g=1<<30)
+
_DEFAULT_DBKW = {
- 'cache_size': 10000,
- 'pool_size': 7,
- 'database_name': 'unnamed',
+ "cache_size": 10000,
+ "pool_size": 7,
+ "database_name": "unnamed",
}
+
class NoResolverForScheme(KeyError):
def __init__(self, uri):
self.uri = uri
- super().__init__(f'No resolver found for uri: {uri}')
+ super().__init__(f"No resolver found for uri: {uri}")
+
class UnknownDatabaseKeywords(KeyError):
def __init__(self, kw):
self.kw = kw
super().__init__(
- f'Unrecognized database keyword(s): {", ".join(kw)}'
+ f"Unrecognized database keyword(s): {", ".join(kw)}"
)
+
def resolve_uri(uri):
"""
Returns a tuple, (factory, dbkw) where factory is a no-arg callable which
@@ -54,9 +58,10 @@ def resolve_uri(uri):
factory, dbkw = _resolve_uri(uri)
return factory, _get_dbkw(dbkw)
+
# _resolve_uri serves resolve_uri: it returns factory and original raw dbkw.
def _resolve_uri(uri):
- scheme = uri[:uri.find(':')]
+ scheme = uri[:uri.find(":")]
try:
resolver_eps = entry_points(group="zodburi.resolvers")
except TypeError: # pragma: NO COVER Python < 3.10
@@ -70,21 +75,24 @@ def _resolve_uri(uri):
else:
raise NoResolverForScheme(uri)
+
def _parse_bytes(s):
m = HAS_UNITS_RE.match(s.lower())
+
if m:
v, uname = m.group(1, 2)
return int(v) * UNITS[uname]
else:
return int(s)
+
def _get_dbkw(kw):
dbkw = _DEFAULT_DBKW.copy()
for parameter in PARAMETERS:
if parameter in kw:
v = kw.pop(parameter)
- if parameter.startswith('connection_'):
+ if parameter.startswith("connection_"):
if not isinstance(v, int):
if PARAMETERS[parameter] in BYTES_PARAMETERS:
v = _parse_bytes(v)
diff --git a/zodburi/datatypes.py b/zodburi/datatypes.py
index cfeed9b..c039058 100644
--- a/zodburi/datatypes.py
+++ b/zodburi/datatypes.py
@@ -1,43 +1,67 @@
TRUETYPES = ('1', 'on', 'true', 't', 'yes')
FALSETYPES = ('', '0', 'off', 'false', 'f', 'no')
+
+class SuffixLengthMismatch(ValueError):
+ def __init__(self, d):
+ self.d = d
+ super().__init__("All suffix keys must have the same length")
+
+
class SuffixMultiplier:
- # d is a dictionary of suffixes to integer multipliers. If no suffixes
- # match, default is the multiplier. Matches are case insensitive. Return
- # values are in the fundamental unit.
+ """Convert integer-like strings w/ size suffixes to integers
+
+ - 'd' is a dictionary of suffixes to integer multipliers.
+ - 'default' is the multiplier if no suffixes match.
+
+ Matches are case insensitive.
+
+ Returned values are in the fundamental unit.
+ """
def __init__(self, d, default=1):
- self._d = d
- self._default = default
# all keys must be the same size
- self._keysz = None
- for k in d.keys():
- if self._keysz is None:
- self._keysz = len(k)
- else:
- if self._keysz != len(k):
- raise ValueError('suffix length missmatch')
+ sizes = set(len(key) for key in d)
+
+ if len(sizes) > 1:
+ raise SuffixLengthMismatch(d)
+
+ self._d = {key.lower(): value for key, value in d.items()}
+ self._default = default
+ self._keysz = sizes.pop() if sizes else 0
def __call__(self, v):
- v = v.lower()
- for s, m in self._d.items():
- if v[-self._keysz:] == s:
- return int(v[:-self._keysz]) * m
- return int(v) * self._default
+ if self._keysz and len(v) > self._keysz:
+ v = v.lower()
+ suffix = v[-self._keysz:]
+ multiplier = self._d.get(suffix, self._default)
+
+ if multiplier is not self._default:
+ v = v[:-self._keysz]
+ else:
+ multiplier = self._default
+
+ return int(v) * multiplier
+
-convert_bytesize = SuffixMultiplier({'kb': 1024,
- 'mb': 1024*1024,
- 'gb': 1024*1024*1024,
- })
+convert_bytesize = SuffixMultiplier({
+ "kb": 1024,
+ "mb": 1024*1024,
+ "gb": 1024*1024*1024,
+})
def convert_int(value):
# boolean values are also treated as integers
value = value.lower()
+
if value in FALSETYPES:
return 0
+
if value in TRUETYPES:
return 1
+
return int(value)
+
def convert_tuple(value):
- return tuple(value.split(','))
+ return tuple(value.split(","))
diff --git a/zodburi/tests/test___init__.py b/zodburi/tests/test___init__.py
index 0c7d38e..7a6915a 100644
--- a/zodburi/tests/test___init__.py
+++ b/zodburi/tests/test___init__.py
@@ -3,7 +3,6 @@
import pytest
import zodburi
-from zodburi import resolvers
@pytest.mark.parametrize("source, expected", [
@@ -24,6 +23,7 @@ def _expected_dbkw(**kw):
dbkw.update(kw)
return dbkw
+
@pytest.mark.parametrize("kw, expected", [
({}, _expected_dbkw()),
({"database_name": "foo"}, _expected_dbkw(database_name="foo")),
diff --git a/zodburi/tests/test_datatypes.py b/zodburi/tests/test_datatypes.py
index 3862721..289d33e 100644
--- a/zodburi/tests/test_datatypes.py
+++ b/zodburi/tests/test_datatypes.py
@@ -1,102 +1,126 @@
import unittest
+import pytest
+
_marker = object()
-class SuffixMultiplierTests(unittest.TestCase):
- def _getTargetClass(self):
- from zodburi.datatypes import SuffixMultiplier
- return SuffixMultiplier
- def _makeOne(self, d=None, default=_marker):
- if d is None:
- d = {}
- if default is _marker:
- return self._getTargetClass()(d)
- return self._getTargetClass()(d, default)
+def _suffix_multiplier(d=None, default=_marker):
+ from zodburi.datatypes import SuffixMultiplier
+
+ if d is None:
+ d = {}
+
+ if default is _marker:
+ return SuffixMultiplier(d)
+
+ return SuffixMultiplier(d, default)
+
+
+def test_suffixmultiplier___init___w_defaults():
+ sm = _suffix_multiplier()
+ assert sm._d == {}
+ assert sm._default == 1
+ assert sm._keysz == 0
+
+
+def test_suffixmultiplier___init___w_explicit_default():
+ sm = _suffix_multiplier(default=3)
+ assert sm._d == {}
+ assert sm._default == 3
+ assert sm._keysz == 0
+
+
+def test_suffixmultiplier___init___w_normal_suffixes():
+ SFX = {"aaa": 2, "bbb": 3}
+ sm = _suffix_multiplier(SFX)
+ assert sm._d == SFX
+ assert sm._default == 1
+ assert sm._keysz == 3
+
+
+def test_suffixmultiplier___init___w_mismatched_suffixes():
+ SFX = {"aaa": 2, "bbbb": 3}
+
+ with pytest.raises(ValueError):
+ _suffix_multiplier(SFX)
+
+
+def test_suffixmultiplier___call____miss():
+ SFX = {"aaa": 2, "bbb": 3}
+ sm = _suffix_multiplier(SFX)
+ assert sm("14") == 14
+
+
+def test_suffixmultiplier___call___hit():
+ SFX = {"aaa": 2, "bbb": 3}
+ sm = _suffix_multiplier(SFX)
+ assert sm("14bbb") == 42
+
+
+def test_convert_bytesize_miss():
+ from zodburi.datatypes import convert_bytesize
+
+ assert convert_bytesize("14") == 14
- def test_ctor_simple(self):
- sm = self._makeOne()
- self.assertEqual(sm._d, {})
- self.assertEqual(sm._default, 1)
- self.assertEqual(sm._keysz, None)
- def test_ctor_w_explicit_default(self):
- sm = self._makeOne(default=3)
- self.assertEqual(sm._default, 3)
+@pytest.mark.parametrize("sized, expected", [
+ ("14", 14),
+ ("200", 200),
+ ("14kb", 14 * 1024),
+ ("14mb", 14 * 1024 * 1024),
+ ("14gb", 14 * 1024 * 1024 * 1024),
+])
+def test_convert_bytesize_hit(sized, expected):
+ from zodburi.datatypes import convert_bytesize
- def test_ctor_w_normal_suffixes(self):
- SFX = {'aaa': 2, 'bbb': 3}
- sm = self._makeOne(SFX)
- self.assertEqual(sm._d, SFX)
- self.assertEqual(sm._default, 1)
- self.assertEqual(sm._keysz, 3)
+ assert convert_bytesize(sized) == expected
- def test_ctor_w_mismatched_suffixes(self):
- SFX = {'aaa': 2, 'bbbb': 3}
- self.assertRaises(ValueError, self._makeOne, SFX)
- def test___call____miss(self):
- SFX = {'aaa': 2, 'bbb': 3}
- sm = self._makeOne(SFX)
- self.assertEqual(sm('14'), 14)
+def test_convert_int_w_falsetypes():
+ from zodburi.datatypes import convert_int
+ from zodburi.datatypes import FALSETYPES
- def test___call____hit(self):
- SFX = {'aaa': 2, 'bbb': 3}
- sm = self._makeOne(SFX)
- self.assertEqual(sm('14bbb'), 42)
+ for v in FALSETYPES:
+ assert convert_int(v) == 0
+ assert convert_int(v.title()) == 0
-class Test_convert_bytesize(unittest.TestCase):
+def test_convert_int_w_truetypes():
+ from zodburi.datatypes import convert_int
+ from zodburi.datatypes import TRUETYPES
- def _callFUT(self, value):
- from zodburi.datatypes import convert_bytesize
- return convert_bytesize(value)
+ for v in TRUETYPES:
+ assert convert_int(v) == 1
+ assert convert_int(v.title()) == 1
- def test_hit(self):
- self.assertEqual(self._callFUT('14kb'), 14 * 1024)
- self.assertEqual(self._callFUT('14mb'), 14 * 1024 * 1024)
- self.assertEqual(self._callFUT('14gb'), 14 * 1024 * 1024 * 1024)
- def test_miss(self):
- self.assertEqual(self._callFUT('14'), 14)
+def test_convert_int_w_normal():
+ from zodburi.datatypes import convert_int
+ assert convert_int("14") == 14
-class Test_convert_int(unittest.TestCase):
- def _callFUT(self, value):
- from zodburi.datatypes import convert_int
- return convert_int(value)
+def test_convert_int_w_invalid():
+ from zodburi.datatypes import convert_int
- def test_hit_falsetypes(self):
- from zodburi.datatypes import FALSETYPES
- for v in FALSETYPES:
- self.assertEqual(self._callFUT(v), 0)
- self.assertEqual(self._callFUT(v.title()), 0)
+ with pytest.raises(ValueError):
+ convert_int("notanint")
- def test_hit_truetypes(self):
- from zodburi.datatypes import TRUETYPES
- for v in TRUETYPES:
- self.assertEqual(self._callFUT(v), 1)
- self.assertEqual(self._callFUT(v.title()), 1)
- def test_hit_normal(self):
- self.assertEqual(self._callFUT('14'), 14)
+def test_convert_tuple_w_empty():
+ from zodburi.datatypes import convert_tuple
- def test_miss(self):
- self.assertRaises(ValueError, self._callFUT, 'notanint')
+ assert convert_tuple("") == ("",)
-class Test_convert_tuple(unittest.TestCase):
+def test_convert_tuple_wo_commas():
+ from zodburi.datatypes import convert_tuple
- def _callFUT(self, value):
- from zodburi.datatypes import convert_tuple
- return convert_tuple(value)
+ assert convert_tuple("abc") == ("abc",)
- def test_empty(self):
- self.assertEqual(self._callFUT(''), ('',))
- def test_wo_commas(self):
- self.assertEqual(self._callFUT('abc'), ('abc',))
+def test_convert_tuple_w_commas():
+ from zodburi.datatypes import convert_tuple
- def test_w_commas(self):
- self.assertEqual(self._callFUT('abc,def'), ('abc', 'def'))
+ assert convert_tuple("abc,def") == ("abc", "def")
diff --git a/zodburi/tests/test_resolvers.py b/zodburi/tests/test_resolvers.py
index 6c4fff2..d417030 100644
--- a/zodburi/tests/test_resolvers.py
+++ b/zodburi/tests/test_resolvers.py
@@ -1,682 +1,762 @@
+import contextlib
from importlib.metadata import distribution
+import os
+import pathlib
+import tempfile
from unittest import mock
from urllib.parse import quote
import unittest
import warnings
import pytest
+from ZEO.ClientStorage import ClientStorage
+from ZODB.blob import BlobStorage
+from ZODB.DemoStorage import DemoStorage
+from ZODB.FileStorage import FileStorage
+from ZODB.MappingStorage import MappingStorage
-class Base:
-
- def test_interpret_kwargs_noargs(self):
- resolver = self._makeOne()
- kwargs = resolver.interpret_kwargs({})
- self.assertEqual(kwargs, ({}, {}))
-
- def test_bytesize_args(self):
- resolver = self._makeOne()
- names = sorted(resolver._bytesize_args)
- kwargs = {}
- for name in names:
- kwargs[name] = '10MB'
- args = resolver.interpret_kwargs(kwargs)[0]
- keys = args.keys()
- self.assertEqual(sorted(keys), names)
- for name, value in args.items():
- self.assertEqual(value, 10*1024*1024)
-
- def test_int_args(self):
- resolver = self._makeOne()
- names = sorted(resolver._int_args)
- kwargs = {}
- for name in names:
- kwargs[name] = '10'
- args = resolver.interpret_kwargs(kwargs)[0]
- keys = sorted(args.keys())
- self.assertEqual(sorted(keys), sorted(names))
- for name, value in args.items():
- self.assertEqual(value, 10)
-
- def test_string_args(self):
- resolver = self._makeOne()
- names = sorted(resolver._string_args)
- kwargs = {}
- for name in names:
- kwargs[name] = 'string'
- args = resolver.interpret_kwargs(kwargs)[0]
- keys = args.keys()
- self.assertEqual(sorted(keys), names)
- for name, value in args.items():
- self.assertEqual(value, 'string')
-
- def test_float_args(self):
- resolver = self._makeOne()
- resolver._float_args = ('pi', 'PI')
- names = sorted(resolver._float_args)
- kwargs = {}
- for name in names:
- kwargs[name] = '3.14'
- args = resolver.interpret_kwargs(kwargs)[0]
- keys = args.keys()
- self.assertEqual(sorted(keys), names)
- for name, value in args.items():
- self.assertEqual(value, 3.14)
-
- def test_tuple_args(self):
- resolver = self._makeOne()
- resolver._tuple_args = ('foo', 'bar')
- names = sorted(resolver._tuple_args)
- kwargs = {}
- for name in names:
- kwargs[name] = 'first,second,third'
- args = resolver.interpret_kwargs(kwargs)[0]
- keys = args.keys()
- self.assertEqual(sorted(keys), names)
- for name, value in args.items():
- self.assertEqual(value, ('first', 'second', 'third'))
-
-class TestFileStorageURIResolver(Base, unittest.TestCase):
-
- def _getTargetClass(self):
- from zodburi.resolvers import FileStorageURIResolver
- return FileStorageURIResolver
-
- def _makeOne(self):
- klass = self._getTargetClass()
- return klass()
-
- def setUp(self):
- import tempfile
- self.tmpdir = tempfile.mkdtemp()
-
- def tearDown(self):
- import shutil
- shutil.rmtree(self.tmpdir)
-
- def test_bool_args(self):
- resolver = self._makeOne()
- f = resolver.interpret_kwargs
- kwargs = f({'read_only':'1'})
- self.assertEqual(kwargs[0], {'read_only':1})
- kwargs = f({'read_only':'true'})
- self.assertEqual(kwargs[0], {'read_only':1})
- kwargs = f({'read_only':'on'})
- self.assertEqual(kwargs[0], {'read_only':1})
- kwargs = f({'read_only':'off'})
- self.assertEqual(kwargs[0], {'read_only':0})
- kwargs = f({'read_only':'no'})
- self.assertEqual(kwargs[0], {'read_only':0})
- kwargs = f({'read_only':'false'})
- self.assertEqual(kwargs[0], {'read_only':0})
-
- @mock.patch('zodburi.resolvers.FileStorage')
- def test_call_no_qs(self, FileStorage):
- resolver = self._makeOne()
- factory, dbkw = resolver('file:///tmp/foo/bar')
- factory()
- FileStorage.assert_called_once_with('/tmp/foo/bar')
+FS_FILENAME = "db.db"
+FS_BLOBDIR = "blob"
- @mock.patch('zodburi.resolvers.FileStorage')
- def test_call_abspath(self, FileStorage):
- resolver = self._makeOne()
- factory, dbkw = resolver('file:///tmp/foo/bar?read_only=true')
- factory()
- FileStorage.assert_called_once_with('/tmp/foo/bar', read_only=1)
- @mock.patch('zodburi.resolvers.FileStorage')
- def test_call_abspath_windows(self, FileStorage):
- resolver = self._makeOne()
- factory, dbkw = resolver(
- 'file://C:\\foo\\bar?read_only=true')
- factory()
- FileStorage.assert_called_once_with('C:\\foo\\bar', read_only=1)
+@pytest.fixture(scope="function")
+def tmpdir():
+ with tempfile.TemporaryDirectory() as tmp:
+ yield tmp
+
+
+@pytest.fixture(scope="function")
+def zconfig_tmpfile():
+ with tempfile.NamedTemporaryFile() as tmp:
+ yield tmp
+
+
+@pytest.fixture(scope="function")
+def zconfig_path(zconfig_tmpfile):
+ yield pathlib.Path(zconfig_tmpfile.name)
+
+
+def _fs_resolver():
+ from zodburi.resolvers import FileStorageURIResolver
+ return FileStorageURIResolver()
+
+
+def _mapping_resolver():
+ from zodburi.resolvers import MappingStorageURIResolver
+ return MappingStorageURIResolver()
+
+
+def _client_resolver():
+ from zodburi.resolvers import ClientStorageURIResolver
+ return ClientStorageURIResolver()
+
+
+def _zconfig_resolver():
+ from zodburi.resolvers import ZConfigURIResolver
+ return ZConfigURIResolver()
+
+def _demo_resolver():
+ from zodburi.resolvers import DemoStorageURIResolver
+ return DemoStorageURIResolver()
+
+
+@pytest.mark.parametrize("factory", [_fs_resolver, _mapping_resolver])
+def test_interpret_kwargs_noargs(factory):
+ resolver = factory()
+
+ new, unused = resolver.interpret_kwargs({})
+
+ assert new == {}
+ assert unused == {}
+
+
+@pytest.mark.parametrize("factory", [_fs_resolver, _mapping_resolver])
+def test_interpret_kwargs_bytesize_args(factory):
+ resolver = factory()
+ names = sorted(resolver._bytesize_args)
+ kwargs = {
+ name: "10MB" for name in names
+ }
+
+ new, unused = resolver.interpret_kwargs(kwargs)
+
+ assert sorted(new) == names
+
+ if new:
+ assert set(new.values()) == {10 * 1024 * 1024}
+
+
+@pytest.mark.parametrize("factory", [_fs_resolver, _mapping_resolver])
+def test_interpret_kwargs_int_args(factory):
+ resolver = factory()
+ names = sorted(resolver._int_args)
+ kwargs = {
+ name: "10" for name in names
+ }
+
+ new, unused = resolver.interpret_kwargs(kwargs)
- @mock.patch('zodburi.resolvers.FileStorage')
- def test_call_normpath(self, FileStorage):
- resolver = self._makeOne()
- factory, dbkw = resolver('file:///tmp/../foo/bar?read_only=true')
+ assert sorted(new) == sorted(new)
+
+ if new:
+ assert set(new.values()) == {10}
+
+
+@pytest.mark.parametrize("factory", [_fs_resolver, _mapping_resolver])
+def test_interpret_kwargs_string_args(factory):
+ resolver = factory()
+ names = sorted(resolver._string_args)
+ kwargs = {
+ name: "string" for name in names
+ }
+
+ new, unused = resolver.interpret_kwargs(kwargs)
+
+ assert sorted(new) == names
+
+ if new:
+ assert set(new.values()) == {"string"}
+
+
+@pytest.mark.parametrize("factory", [_fs_resolver, _mapping_resolver])
+def test_interpret_kwargs_float_args(factory):
+ resolver = factory()
+ resolver._float_args = ("pi", "PI")
+ names = sorted(resolver._float_args)
+ kwargs = {
+ "pi": "3.14",
+ "PI": "3.14",
+ }
+
+ new, unussed = resolver.interpret_kwargs(kwargs)
+
+ assert sorted(new) == names
+
+ if new:
+ assert set(new.values()) == {3.14}
+
+
+@pytest.mark.parametrize("factory", [_fs_resolver, _mapping_resolver])
+def test_interpret_kwargs_tuple_args(factory):
+ resolver = factory()
+ resolver._tuple_args = ("foo", "bar")
+ names = sorted(resolver._tuple_args)
+ kwargs = {
+ name: "first,second,third" for name in names
+ }
+
+ new, unused = resolver.interpret_kwargs(kwargs)
+
+ assert sorted(new) == names
+
+ if new:
+ assert set(new.values()) == {("first", "second", "third")}
+
+
+@pytest.mark.parametrize("passed, expected", [
+ ("1", 1),
+ ("0", 0),
+ ("true", 1),
+ ("false", 0),
+ ("on", 1),
+ ("off", 0),
+ ("yes", 1),
+ ("no", 0),
+])
+@pytest.mark.parametrize("factory", [
+ _fs_resolver,
+ _client_resolver,
+])
+def test_fsresolver_interpres_kwargs_bool_args(factory, passed, expected):
+ resolver = factory()
+ kwargs = {"read_only": passed}
+
+ new, unused = resolver.interpret_kwargs(kwargs)
+
+ assert new == {"read_only": expected}
+
+
+@pytest.mark.parametrize("uri, expected_args, expected_kwargs", [
+ ("file:///tmp/foo/bar", ("/tmp/foo/bar",), {}),
+ (
+ "file:///tmp/foo/bar?read_only=true",
+ ("/tmp/foo/bar",),
+ {"read_only": 1}
+ ),
+ (
+ "file://C:\\foo\\bar?read_only=true",
+ ("C:\\foo\\bar",),
+ {"read_only": 1}
+ ),
+ (
+ "file:///tmp/../foo/bar?read_only=true",
+ ("/foo/bar",),
+ {"read_only": 1}
+ ),
+])
+def test_fsresolver___call___mock_invoke_factory(
+ uri, expected_args, expected_kwargs,
+):
+ resolver = _fs_resolver()
+
+ factory, dbkw = resolver(uri)
+ assert dbkw == {}
+
+ with mock.patch("zodburi.resolvers.FileStorage") as fs_klass:
factory()
- FileStorage.assert_called_once_with('/foo/bar', read_only=1)
-
- def test_invoke_factory_filestorage(self):
- import os
- from ZODB.FileStorage import FileStorage
- self.assertFalse(os.path.exists(os.path.join(self.tmpdir, 'db.db')))
- resolver = self._makeOne()
- factory, dbkw = resolver('file://%s/db.db?quota=200' % self.tmpdir)
- storage = factory()
- self.assertTrue(isinstance(storage, FileStorage))
- try:
- self.assertEqual(storage._quota, 200)
- self.assertTrue(os.path.exists(os.path.join(self.tmpdir, 'db.db')))
- finally:
- storage.close()
-
- def test_invoke_factory_demostorage(self):
- import os
- from ZODB.DemoStorage import DemoStorage
- from ZODB.FileStorage import FileStorage
- resolver = self._makeOne()
- with warnings.catch_warnings(record=True) as w:
- warnings.simplefilter("always")
- factory, dbkw = resolver(
- 'file://%s/db.db?demostorage=true' % self.tmpdir)
- self.assertEqual(len(w), 1)
- self.assertTrue(issubclass(w[-1].category, DeprecationWarning))
- self.assertIn("demostorage option is deprecated, use demo:// instead", str(w[-1].message))
- storage = factory()
- self.assertTrue(isinstance(storage, DemoStorage))
- self.assertTrue(isinstance(storage.base, FileStorage))
- try:
- self.assertEqual(dbkw, {})
- self.assertTrue(os.path.exists(os.path.join(self.tmpdir, 'db.db')))
- finally:
- storage.close()
-
- def test_invoke_factory_blobstorage(self):
- import os
- from ZODB.blob import BlobStorage
- DB_FILE = os.path.join(self.tmpdir, 'db.db')
- BLOB_DIR = os.path.join(self.tmpdir, 'blob')
- self.assertFalse(os.path.exists(DB_FILE))
- resolver = self._makeOne()
+
+ fs_klass.assert_called_once_with(*expected_args, **expected_kwargs)
+
+
+def test_fsresolver___call___check_dbkw():
+ resolver = _fs_resolver()
+ factory, dbkw = resolver(
+ "file:///tmp/foo/bar"
+ "?connection_pool_size=1"
+ "&connection_cache_size=1"
+ "&database_name=dbname"
+ )
+
+ assert dbkw == {
+ "connection_cache_size": "1",
+ "connection_pool_size": "1",
+ "database_name": "dbname",
+ }
+
+
+def test_fsresolver_invoke_factory(tmpdir):
+ fs_dir = pathlib.Path(tmpdir)
+ db_path = fs_dir / FS_FILENAME
+ resolver = _fs_resolver()
+
+ factory, dbkw = resolver(f"file://{tmpdir}/{FS_FILENAME}?quota=200")
+
+ assert dbkw == {}
+
+ with contextlib.closing(factory()) as storage:
+ assert isinstance(storage, FileStorage)
+ assert storage._quota == 200
+ assert db_path.exists()
+
+
+def test_fsresolver_invoke_factory_w_demostorage(tmpdir):
+ fs_dir = pathlib.Path(tmpdir)
+ db_path = fs_dir / FS_FILENAME
+ resolver = _fs_resolver()
+
+ with warnings.catch_warnings(record=True) as log:
factory, dbkw = resolver(
- 'file://%s/db.db?quota=200'
- '&blobstorage_dir=%s/blob'
- '&blobstorage_layout=bushy' % (self.tmpdir, quote(self.tmpdir)))
- storage = factory()
- self.assertTrue(isinstance(storage, BlobStorage))
- try:
- self.assertTrue(os.path.exists(DB_FILE))
- self.assertTrue(os.path.exists(BLOB_DIR))
- finally:
- storage.close()
-
- def test_invoke_factory_blobstorage_and_demostorage(self):
- import os
- from ZODB.DemoStorage import DemoStorage
- DB_FILE = os.path.join(self.tmpdir, 'db.db')
- BLOB_DIR = os.path.join(self.tmpdir, 'blob')
- self.assertFalse(os.path.exists(DB_FILE))
- resolver = self._makeOne()
- with warnings.catch_warnings(record=True) as w:
- warnings.simplefilter("always")
- factory, dbkw = resolver(
- 'file://%s/db.db?quota=200&demostorage=true'
- '&blobstorage_dir=%s/blob'
- '&blobstorage_layout=bushy' % (
- self.tmpdir, quote(
- self.tmpdir
- )))
- self.assertEqual(len(w), 1)
- self.assertTrue(issubclass(w[-1].category, DeprecationWarning))
- self.assertIn("demostorage option is deprecated, use demo:// instead", str(w[-1].message))
- storage = factory()
- self.assertTrue(isinstance(storage, DemoStorage))
- try:
- self.assertTrue(os.path.exists(DB_FILE))
- self.assertTrue(os.path.exists(BLOB_DIR))
- finally:
- storage.close()
-
- def test_dbargs(self):
- resolver = self._makeOne()
+ f"file://{tmpdir}/{FS_FILENAME}?demostorage=true",
+ )
+
+ assert dbkw == {}
+
+ warned, = log
+ assert issubclass(warned.category, DeprecationWarning)
+ assert (
+ "demostorage option is deprecated, use demo:// instead"
+ in str(warned.message)
+ )
+
+ with contextlib.closing(factory()) as storage:
+ assert isinstance(storage, DemoStorage)
+ assert isinstance(storage.base, FileStorage)
+ assert db_path.exists()
+
+
+def test_fsresolver_invoke_factory_w_blobstorage(tmpdir):
+ fs_dir = pathlib.Path(tmpdir)
+ db_path = fs_dir / FS_FILENAME
+ blob_dir = fs_dir / FS_BLOBDIR
+ quoted = quote(str(blob_dir))
+ resolver = _fs_resolver()
+
+ factory, dbkw = resolver(
+ f"file://{tmpdir}/{FS_FILENAME}?quota=200"
+ f"&blobstorage_dir={quoted}&blobstorage_layout=bushy"
+ )
+
+ assert dbkw == {}
+
+ with contextlib.closing(factory()) as storage:
+ assert isinstance(storage, BlobStorage)
+ assert blob_dir.exists()
+ assert db_path.exists()
+
+
+def test_fsresolver_invoke_factory_w_blobstorage_and_demostorage(tmpdir):
+ fs_dir = pathlib.Path(tmpdir)
+ db_path = fs_dir / FS_FILENAME
+ blob_dir = fs_dir / FS_BLOBDIR
+ quoted = quote(str(blob_dir))
+ resolver = _fs_resolver()
+
+ with warnings.catch_warnings(record=True) as log:
factory, dbkw = resolver(
- 'file:///tmp/../foo/bar?connection_pool_size=1'
- '&connection_cache_size=1&database_name=dbname')
- self.assertEqual(dbkw, {'connection_cache_size': '1',
- 'connection_pool_size': '1',
- 'database_name': 'dbname'})
-
-
-class TestClientStorageURIResolver(unittest.TestCase):
- def _getTargetClass(self):
- from zodburi.resolvers import ClientStorageURIResolver
- return ClientStorageURIResolver
-
- def _makeOne(self):
- klass = self._getTargetClass()
- return klass()
-
- def test_bool_args(self):
- resolver = self._makeOne()
- f = resolver.interpret_kwargs
- kwargs = f({'read_only':'1'})
- self.assertEqual(kwargs[0], {'read_only':1})
- kwargs = f({'read_only':'true'})
- self.assertEqual(kwargs[0], {'read_only':1})
- kwargs = f({'read_only':'on'})
- self.assertEqual(kwargs[0], {'read_only':1})
- kwargs = f({'read_only':'off'})
- self.assertEqual(kwargs[0], {'read_only':0})
- kwargs = f({'read_only':'no'})
- self.assertEqual(kwargs[0], {'read_only':0})
- kwargs = f({'read_only':'false'})
- self.assertEqual(kwargs[0], {'read_only':0})
-
- @mock.patch('zodburi.resolvers.ClientStorage')
- def test_call_tcp_no_port(self, ClientStorage):
- resolver = self._makeOne()
- factory, dbkw = resolver('zeo://localhost?debug=true')
+ f"file://{tmpdir}/{FS_FILENAME}?quota=200&demostorage=true"
+ f"&blobstorage_dir={quoted}&blobstorage_layout=bushy"
+ )
+
+ assert dbkw == {}
+
+ warned, = log
+ assert issubclass(warned.category, DeprecationWarning)
+ assert (
+ "demostorage option is deprecated, use demo:// instead"
+ in str(warned.message)
+ )
+
+ with contextlib.closing(factory()) as storage:
+ assert isinstance(storage, DemoStorage)
+ assert isinstance(storage.base, BlobStorage)
+ assert blob_dir.exists()
+ assert db_path.exists()
+
+
+@pytest.mark.parametrize("uri, expected_args, expected_kwargs", [
+ ("zeo://localhost", (("localhost", 9991),), {}),
+ ("zeo://localhost:8080?debug=true", (("localhost", 8080),), {"debug": 1}),
+ ("zeo://[::1]", (("::1", 9991),), {}),
+ ("zeo://[::1]:9990?debug=true", (("::1", 9990),), {"debug": 1}),
+ ("zeo:///var/sock?debug=true", ("/var/sock",), {"debug": 1}),
+ ("zeo:///var/nosuchfile?wait=false", ("/var/nosuchfile",), {"wait": 0}),
+ (
+ (
+ 'zeo:///var/nosuchfile?'
+ 'storage=main&'
+ 'cache_size=1kb&'
+ 'name=foo&'
+ 'client=bar&'
+ 'var=baz&'
+ 'min_disconnect_poll=2&'
+ 'max_disconnect_poll=3&'
+ 'wait_for_server_on_startup=true&'
+ 'wait=4&'
+ 'wait_timeout=5&'
+ 'read_only=6&'
+ 'read_only_fallback=7&'
+ 'drop_cache_rather_verify=true&'
+ 'username=monty&'
+ 'password=python&'
+ 'realm=blat&'
+ 'blob_dir=some/dir&'
+ 'shared_blob_dir=true&'
+ 'blob_cache_size=1kb&'
+ 'blob_cache_size_check=8&'
+ 'client_label=fink&'
+ ),
+ ('/var/nosuchfile',),
+ dict(
+ storage='main',
+ cache_size=1024,
+ name='foo',
+ client='bar',
+ var='baz',
+ min_disconnect_poll=2,
+ max_disconnect_poll=3,
+ wait_for_server_on_startup=1,
+ wait=4,
+ wait_timeout=5,
+ read_only=6,
+ read_only_fallback=7,
+ drop_cache_rather_verify=1,
+ username='monty',
+ password='python',
+ realm='blat',
+ blob_dir='some/dir',
+ shared_blob_dir=1,
+ blob_cache_size=1024,
+ blob_cache_size_check=8,
+ client_label='fink',
+ ),
+ ),
+])
+def test_client_resolver___call___(uri, expected_args, expected_kwargs):
+ resolver = _client_resolver()
+
+ factory, dbkw = resolver(uri)
+
+ assert dbkw == {}
+
+ with mock.patch("zodburi.resolvers.ClientStorage") as cs:
factory()
- ClientStorage.assert_called_once_with(('localhost', 9991), debug=1)
- @mock.patch('zodburi.resolvers.ClientStorage')
- def test_call_tcp(self, ClientStorage):
- resolver = self._makeOne()
- factory, dbkw = resolver('zeo://localhost:8080?debug=true')
- factory()
- ClientStorage.assert_called_once_with(('localhost', 8080), debug=1)
+ cs.assert_called_once_with(*expected_args, **expected_kwargs)
- @mock.patch('zodburi.resolvers.ClientStorage')
- def test_call_ipv6_no_port(self, ClientStorage):
- resolver = self._makeOne()
- factory, dbkw = resolver('zeo://[::1]?debug=true')
- factory()
- ClientStorage.assert_called_once_with(('::1', 9991), debug=1)
+def test_client_resolver___call___check_dbkw():
+ resolver = _client_resolver()
- @mock.patch('zodburi.resolvers.ClientStorage')
- def test_call_ipv6(self, ClientStorage):
- resolver = self._makeOne()
- factory, dbkw = resolver('zeo://[::1]:9090?debug=true')
- factory()
- ClientStorage.assert_called_once_with(('::1', 9090), debug=1)
+ factory, dbkw = resolver(
+ "zeo://localhost:8080?"
+ "connection_pool_size=1&"
+ "connection_cache_size=1&"
+ "database_name=dbname"
+ )
+ assert dbkw == {
+ "connection_pool_size": "1",
+ "connection_cache_size": "1",
+ "database_name": "dbname",
+ }
- @mock.patch('zodburi.resolvers.ClientStorage')
- def test_call_unix(self, ClientStorage):
- resolver = self._makeOne()
- factory, dbkw = resolver('zeo:///var/sock?debug=true')
- factory()
- ClientStorage.assert_called_once_with('/var/sock', debug=1)
-
- @mock.patch('zodburi.resolvers.ClientStorage')
- def test_invoke_factory(self, ClientStorage):
- resolver = self._makeOne()
- factory, dbkw = resolver('zeo:///var/nosuchfile?wait=false')
- storage = factory()
- storage.close()
- ClientStorage.assert_called_once_with('/var/nosuchfile', wait=0)
-
- @mock.patch('zodburi.resolvers.ClientStorage')
- def test_factory_kwargs(self, ClientStorage):
- resolver = self._makeOne()
- factory, dbkw = resolver('zeo:///var/nosuchfile?'
- 'storage=main&'
- 'cache_size=1kb&'
- 'name=foo&'
- 'client=bar&'
- 'var=baz&'
- 'min_disconnect_poll=2&'
- 'max_disconnect_poll=3&'
- 'wait_for_server_on_startup=true&'
- 'wait=4&'
- 'wait_timeout=5&'
- 'read_only=6&'
- 'read_only_fallback=7&'
- 'drop_cache_rather_verify=true&'
- 'username=monty&'
- 'password=python&'
- 'realm=blat&'
- 'blob_dir=some/dir&'
- 'shared_blob_dir=true&'
- 'blob_cache_size=1kb&'
- 'blob_cache_size_check=8&'
- 'client_label=fink&'
- )
- storage = factory()
- storage.close()
- ClientStorage.assert_called_once_with('/var/nosuchfile',
- storage='main',
- cache_size=1024,
- name='foo',
- client='bar',
- var='baz',
- min_disconnect_poll=2,
- max_disconnect_poll=3,
- wait_for_server_on_startup=1,
- wait=4,
- wait_timeout=5,
- read_only=6,
- read_only_fallback=7,
- drop_cache_rather_verify=1,
- username='monty',
- password='python',
- realm='blat',
- blob_dir='some/dir',
- shared_blob_dir=1,
- blob_cache_size=1024,
- blob_cache_size_check=8,
- client_label='fink',
- )
-
-
- @mock.patch('zodburi.resolvers.ClientStorage')
- def test_invoke_factory_demostorage(self, ClientStorage):
- from ZODB.DemoStorage import DemoStorage
- resolver = self._makeOne()
- with warnings.catch_warnings(record=True) as w:
- warnings.simplefilter("always")
- factory, dbkw = resolver('zeo:///var/nosuchfile?wait=false'
- '&demostorage=true')
- self.assertEqual(len(w), 1)
- self.assertTrue(issubclass(w[-1].category, DeprecationWarning))
- self.assertIn("demostorage option is deprecated, use demo:// instead", str(w[-1].message))
- storage = factory()
- storage.close()
- self.assertTrue(isinstance(storage, DemoStorage))
-
- def test_dbargs(self):
- resolver = self._makeOne()
- factory, dbkw = resolver('zeo://localhost:8080?debug=true&'
- 'connection_pool_size=1&'
- 'connection_cache_size=1&'
- 'database_name=dbname')
- self.assertEqual(dbkw, {'connection_pool_size': '1',
- 'connection_cache_size': '1',
- 'database_name': 'dbname'})
-
-
-class TestZConfigURIResolver(unittest.TestCase):
- def _getTargetClass(self):
- from zodburi.resolvers import ZConfigURIResolver
- return ZConfigURIResolver
-
- def _makeOne(self):
- klass = self._getTargetClass()
- return klass()
-
- def setUp(self):
- import tempfile
- self.tmp = tempfile.NamedTemporaryFile()
-
- def tearDown(self):
- self.tmp.close()
-
- def test_named_storage(self):
- self.tmp.write(b"""
-
-
-
-
-
- """)
- self.tmp.flush()
- resolver = self._makeOne()
- factory, dbkw = resolver('zconfig://%s#bar' % self.tmp.name)
- storage = factory()
- from ZODB.MappingStorage import MappingStorage
- self.assertTrue(isinstance(storage, MappingStorage), storage)
-
- def test_anonymous_storage(self):
- self.tmp.write(b"""
-
-
-
-
-
- """)
- self.tmp.flush()
- resolver = self._makeOne()
- factory, dbkw = resolver('zconfig://%s' % self.tmp.name)
- storage = factory()
- from ZODB.MappingStorage import MappingStorage
- self.assertTrue(isinstance(storage, MappingStorage))
- self.assertEqual(dbkw, {})
-
- def test_query_string_args(self):
- self.tmp.write(b"""
-
-
-
-
-
- """)
- self.tmp.flush()
- resolver = self._makeOne()
- factory, dbkw = resolver('zconfig://%s?foo=bar' % self.tmp.name)
- self.assertEqual(dbkw, {'foo': 'bar'})
-
- def test_storage_not_found(self):
- self.tmp.write(b"""
-
-
- """)
- self.tmp.flush()
- resolver = self._makeOne()
- self.assertRaises(KeyError, resolver, 'zconfig://%s#y' % self.tmp.name)
-
- def test_anonymous_database(self):
- self.tmp.write(b"""
-
-
-
-
- """)
- self.tmp.flush()
- resolver = self._makeOne()
- factory, dbkw = resolver('zconfig://%s' % self.tmp.name)
- storage = factory()
- from ZODB.MappingStorage import MappingStorage
- self.assertTrue(isinstance(storage, MappingStorage))
- self.assertEqual(dbkw,
- {'connection_cache_size': 5000,
- 'connection_cache_size_bytes': 0,
- 'connection_historical_cache_size': 1000,
- 'connection_historical_cache_size_bytes': 0,
- 'connection_historical_pool_size': 3,
- 'connection_historical_timeout': 300,
- 'connection_large_record_size': 16777216,
- 'connection_pool_size': 7})
-
-
- def test_named_database(self):
- self.tmp.write(b"""
-
-
-
- database-name foo
- cache-size 20000
- pool-size 5
-
- """)
- self.tmp.flush()
- resolver = self._makeOne()
- factory, dbkw = resolver('zconfig://%s#x' % self.tmp.name)
- storage = factory()
- from ZODB.MappingStorage import MappingStorage
- self.assertTrue(isinstance(storage, MappingStorage))
- self.assertEqual(dbkw,
- {'connection_cache_size': 20000,
- 'connection_cache_size_bytes': 0,
- 'connection_historical_cache_size': 1000,
- 'connection_historical_cache_size_bytes': 0,
- 'connection_historical_pool_size': 3,
- 'connection_historical_timeout': 300,
- 'connection_large_record_size': 16777216,
- 'connection_pool_size': 5,
- 'database_name': 'foo'})
-
-
- def test_database_all_options(self):
- from zodburi import CONNECTION_PARAMETERS, BYTES_PARAMETERS
- self.tmp.write(("""
-
-
-
- database-name foo
- %s
-
- """ % '\n'.join("{} {}".format(
- name.replace('_', '-'),
- '%sMB' % i if name in BYTES_PARAMETERS else i,
- )
- for (i, name)
- in enumerate(CONNECTION_PARAMETERS)
- )).encode())
- self.tmp.flush()
- resolver = self._makeOne()
- factory, dbkw = resolver('zconfig://%s#x' % self.tmp.name)
- storage = factory()
- from ZODB.MappingStorage import MappingStorage
- self.assertTrue(isinstance(storage, MappingStorage))
- expect = dict(database_name='foo')
- for i, parameter in enumerate(CONNECTION_PARAMETERS):
- cparameter = 'connection_' + parameter
- expect[cparameter] = i
- if parameter in BYTES_PARAMETERS:
- expect[cparameter] *= 1<<20
- self.assertEqual(dbkw, expect)
-
- def test_database_integration_because_ints(self):
- from zodburi import resolve_uri
- self.tmp.write(b"""
-
-
-
-
- """)
- self.tmp.flush()
- from zodburi import resolve_uri
- factory, dbkw = resolve_uri('zconfig://%s' % self.tmp.name)
- storage = factory()
- from ZODB.MappingStorage import MappingStorage
- self.assertTrue(isinstance(storage, MappingStorage))
- self.assertEqual(dbkw,
- {'cache_size': 5000,
- 'cache_size_bytes': 0,
- 'historical_cache_size': 1000,
- 'historical_cache_size_bytes': 0,
- 'historical_pool_size': 3,
- 'historical_timeout': 300,
- 'large_record_size': 16777216,
- 'pool_size': 7,
- 'database_name': 'unnamed'})
-
-
-class TestMappingStorageURIResolver(Base, unittest.TestCase):
-
- def _getTargetClass(self):
- from zodburi.resolvers import MappingStorageURIResolver
- return MappingStorageURIResolver
-
- def _makeOne(self):
- klass = self._getTargetClass()
- return klass()
-
- def test_call_no_qs(self):
- resolver = self._makeOne()
- factory, dbkw = resolver('memory://')
- self.assertEqual(dbkw, {})
- storage = factory()
- from ZODB.MappingStorage import MappingStorage
- self.assertTrue(isinstance(storage, MappingStorage))
- self.assertEqual(storage.__name__, '')
-
- def test_call_with_qs(self):
- uri='memory://storagename?connection_cache_size=100&database_name=fleeb'
- resolver = self._makeOne()
- factory, dbkw = resolver(uri)
- self.assertEqual(dbkw, {'connection_cache_size': '100',
- 'database_name': 'fleeb'})
- storage = factory()
- from ZODB.MappingStorage import MappingStorage
- self.assertTrue(isinstance(storage, MappingStorage))
- self.assertEqual(storage.__name__, 'storagename')
-
-
-class TestDemoStorageURIResolver(unittest.TestCase):
-
- def _getTargetClass(self):
- from zodburi.resolvers import DemoStorageURIResolver
- return DemoStorageURIResolver
-
- def _makeOne(self):
- klass = self._getTargetClass()
- return klass()
-
- def test_invalid_uri_no_match(self):
- from zodburi.resolvers import InvalidDemoStorgeURI
- resolver = self._makeOne()
-
- with pytest.raises(InvalidDemoStorgeURI):
- resolver("bogus:name")
-
- def test_invalid_uri_kwargs_in_base(self):
- from zodburi.resolvers import InvalidDemoStorgeURI
- resolver = self._makeOne()
-
- with pytest.raises(InvalidDemoStorgeURI):
- resolver(
- "demo:"
- "(file:///tmp/blah?pool_size=1234)/"
- "(file:///tmp/qux)"
- )
-
- def test_invalid_uri_kwargs_in_changes(self):
- from zodburi.resolvers import InvalidDemoStorgeURI
- resolver = self._makeOne()
-
- with pytest.raises(InvalidDemoStorgeURI):
- resolver(
- "demo:"
- "(file:///tmp/blah)/"
- "(file:///tmp/qux?pool_size=1234)"
- )
-
- def test_fsoverlay(self):
- import os.path, tempfile, shutil
- tmpdir = tempfile.mkdtemp()
- def _():
- shutil.rmtree(tmpdir)
- self.addCleanup(_)
-
- resolver = self._makeOne()
- basef = os.path.join(tmpdir, 'base.fs')
- changef = os.path.join(tmpdir, 'changes.fs')
- self.assertFalse(os.path.exists(basef))
- self.assertFalse(os.path.exists(changef))
- factory, dbkw = resolver('demo:(file://{})/(file://{}?quota=200)'.format(basef, changef))
- self.assertEqual(dbkw, {})
- demo = factory()
- from ZODB.DemoStorage import DemoStorage
- from ZODB.FileStorage import FileStorage
- self.assertTrue(isinstance(demo, DemoStorage))
- self.assertTrue(isinstance(demo.base, FileStorage))
- self.assertTrue(isinstance(demo.changes, FileStorage))
- self.assertTrue(os.path.exists(basef))
- self.assertTrue(os.path.exists(changef))
- self.assertEqual(demo.changes._quota, 200)
-
- def test_parse_frag(self):
- resolver = self._makeOne()
- factory, dbkw = resolver('demo:(memory://111)/(memory://222)#foo=bar&abc=def')
- self.assertEqual(dbkw, {'foo': 'bar', 'abc': 'def'})
- demo = factory()
- from ZODB.DemoStorage import DemoStorage
- from ZODB.MappingStorage import MappingStorage
- self.assertTrue(isinstance(demo, DemoStorage))
- self.assertTrue(isinstance(demo.base, MappingStorage))
- self.assertEqual(demo.base.__name__, '111')
- self.assertTrue(isinstance(demo.changes, MappingStorage))
- self.assertEqual(demo.changes.__name__, '222')
-
-
-class TestEntryPoints(unittest.TestCase):
-
- def test_it(self):
- from zodburi import resolvers
-
- our_eps = {
- ep.name: ep for ep in distribution("zodburi").entry_points
- }
- expected = [
- ('memory', resolvers.MappingStorageURIResolver),
- ('zeo', resolvers.ClientStorageURIResolver),
- ('file', resolvers.FileStorageURIResolver),
- ('zconfig', resolvers.ZConfigURIResolver),
- ('demo', resolvers.DemoStorageURIResolver),
- ]
- for name, cls in expected:
- target = our_eps[name].load()
- self.assertTrue(isinstance(target, cls))
+
+def test_client_resolver_invoke_factory():
+ resolver = _client_resolver()
+
+ factory, dbkw = resolver("zeo:///var/nosouchfile?wait=false")
+ assert dbkw == {}
+
+ with contextlib.closing(factory()) as storage:
+ assert isinstance(storage, ClientStorage)
+ # Stub out the storage's server to allow close to complete.
+ storage._server = mock.Mock(spec_set=("close",))
+
+
+def test_client_resolver_invoke_factory_w_demostorage():
+ resolver = _client_resolver()
+
+ with warnings.catch_warnings(record=True) as log:
+ factory, dbkw = resolver(
+ f"zeo:///var/nosuchfile?demostorage=true&wait=false",
+ )
+
+ assert dbkw == {}
+
+ warned, = log
+ assert issubclass(warned.category, DeprecationWarning)
+ assert (
+ "demostorage option is deprecated, use demo:// instead"
+ in str(warned.message)
+ )
+
+ with contextlib.closing(factory()) as storage:
+ assert isinstance(storage, DemoStorage)
+ assert isinstance(storage.base, ClientStorage)
+ # Stub out the storage's server to allow close to complete.
+ storage.base._server = mock.Mock(spec_set=("close",))
+
+
+def test_zconfig_resolver___call___check_dbkw(zconfig_path):
+ zconfig_path.write_text(
+ """\
+
+
+"""
+ )
+ resolver = _zconfig_resolver()
+ factory, dbkw = resolver(f"zconfig://{zconfig_path}?foo=bar")
+
+ assert dbkw == {"foo": "bar"}
+
+
+def test_zconfig_resolver___call___w_unknown_storage(zconfig_path):
+ zconfig_path.write_text(
+ """\
+
+
+"""
+ )
+ resolver = _zconfig_resolver()
+ with pytest.raises(KeyError):
+ resolver(f"zconfig://{zconfig_path}#y")
+
+
+
+def test_zconfig_resolver_invoke_factory_w_named_storage(zconfig_path):
+ zconfig_path.write_text(
+ """\
+
+
+
+
+
+"""
+ )
+ resolver = _zconfig_resolver()
+ factory, dbkw = resolver(f"zconfig://{zconfig_path}#bar")
+
+ assert dbkw == {}
+
+ with contextlib.closing(factory()) as storage:
+ assert isinstance(storage, MappingStorage)
+
+
+def test_zconfig_resolver_invoke_factory_w_anonymous_storage(zconfig_path):
+ zconfig_path.write_text(
+ """\
+
+
+
+
+
+"""
+ )
+ resolver = _zconfig_resolver()
+ factory, dbkw = resolver(f"zconfig://{zconfig_path}")
+
+ assert dbkw == {}
+
+ with contextlib.closing(factory()) as storage:
+ assert isinstance(storage, MappingStorage)
+
+
+def test_zconfig_resolver_invoke_factory_w_anon_db_w_defaults(zconfig_path):
+ zconfig_path.write_text(
+ """\
+
+
+
+
+"""
+ )
+ resolver = _zconfig_resolver()
+ factory, dbkw = resolver(f"zconfig://{zconfig_path}")
+
+ assert dbkw == {
+ "connection_cache_size": 5000,
+ "connection_cache_size_bytes": 0,
+ "connection_historical_cache_size": 1000,
+ "connection_historical_cache_size_bytes": 0,
+ "connection_historical_pool_size": 3,
+ "connection_historical_timeout": 300,
+ "connection_large_record_size": 16777216,
+ "connection_pool_size": 7,
+ }
+
+ with contextlib.closing(factory()) as storage:
+ assert isinstance(storage, MappingStorage)
+
+
+def test_zconfig_resolver_invoke_factory_w_named_db_w_explicit(zconfig_path):
+ zconfig_path.write_text(
+ """\
+
+
+
+
+ database-name foo
+ cache-size 20000
+ pool-size 5
+
+"""
+ )
+ resolver = _zconfig_resolver()
+ factory, dbkw = resolver(f"zconfig://{zconfig_path}#x")
+
+ assert dbkw == {
+ "database_name": "foo",
+ "connection_cache_size": 20000,
+ "connection_cache_size_bytes": 0,
+ "connection_historical_cache_size": 1000,
+ "connection_historical_cache_size_bytes": 0,
+ "connection_historical_pool_size": 3,
+ "connection_historical_timeout": 300,
+ "connection_large_record_size": 16777216,
+ "connection_pool_size": 5,
+ }
+
+ with contextlib.closing(factory()) as storage:
+ assert isinstance(storage, MappingStorage)
+
+
+def test_zconfig_resolver_invoke_factory_w_all_options(zconfig_path):
+ from zodburi import CONNECTION_PARAMETERS, BYTES_PARAMETERS
+
+ all_params = [
+ (
+ name.replace("_", "-"),
+ "%sMB" % i if name in BYTES_PARAMETERS else str(i),
+ )
+ for (i, name) in enumerate(CONNECTION_PARAMETERS)
+ ]
+ params_str = "\n".join(
+ [f"{name} {value}" for name, value in all_params]
+ )
+ zconfig_path.write_text(
+ f"""\
+
+
+
+database-name foo
+{params_str}
+
+"""
+ )
+ resolver = _zconfig_resolver()
+ factory, dbkw = resolver(f"zconfig://{zconfig_path}#x")
+
+ expected = {"database_name": "foo"}
+
+ for i, parameter in enumerate(CONNECTION_PARAMETERS):
+ cparameter = f"connection_{parameter}"
+
+ if parameter in BYTES_PARAMETERS:
+ expected[cparameter] = i * 1024 * 1024
+ else:
+ expected[cparameter] = i
+
+ assert dbkw == expected
+
+ with contextlib.closing(factory()) as storage:
+ assert isinstance(storage, MappingStorage)
+
+
+
+def test_resolve_uri_w_zconfig(zconfig_path):
+ from zodburi import resolve_uri
+
+ zconfig_path.write_text("""\
+
+
+
+
+"""
+ )
+ factory, dbkw = resolve_uri(f"zconfig://{zconfig_path}")
+
+ assert dbkw == {
+ "cache_size": 5000,
+ "cache_size_bytes": 0,
+ "historical_cache_size": 1000,
+ "historical_cache_size_bytes": 0,
+ "historical_pool_size": 3,
+ "historical_timeout": 300,
+ "large_record_size": 16777216,
+ "pool_size": 7,
+ "database_name": "unnamed",
+ }
+
+ with contextlib.closing(factory()) as storage:
+ assert isinstance(storage, MappingStorage)
+
+
+def test_mapping_resolver_wo_qs():
+ resolver = _mapping_resolver()
+
+ factory, dbkw = resolver("memory://")
+
+ assert dbkw == {}
+
+ with contextlib.closing(factory()) as storage:
+ assert isinstance(storage, MappingStorage)
+ assert storage.__name__ == ""
+
+
+def test_mapping_resolver_w_qs():
+ resolver = _mapping_resolver()
+
+ factory, dbkw = resolver(
+ "memory://storagename?connection_cache_size=100&database_name=fleeb"
+ )
+
+ assert dbkw == {
+ "connection_cache_size": "100",
+ "database_name": "fleeb"
+ }
+
+ with contextlib.closing(factory()) as storage:
+ assert isinstance(storage, MappingStorage)
+ assert storage.__name__ == "storagename"
+
+
+def test_demo_resolver_w_invalid_uri_no_match():
+ from zodburi.resolvers import InvalidDemoStorgeURI
+
+ resolver = _demo_resolver()
+
+ with pytest.raises(InvalidDemoStorgeURI):
+ resolver("bogus:name")
+
+
+def test_demo_resolver_w_invalid_uri_kwargs_in_base():
+ from zodburi.resolvers import InvalidDemoStorgeURI
+
+ resolver = _demo_resolver()
+
+ with pytest.raises(InvalidDemoStorgeURI):
+ resolver(
+ "demo:"
+ "(file:///tmp/blah?pool_size=1234)/"
+ "(file:///tmp/qux)"
+ )
+
+
+def test_demo_resolver_w_invalid_uri_kwargs_in_changes():
+ from zodburi.resolvers import InvalidDemoStorgeURI
+
+ resolver = _demo_resolver()
+
+ with pytest.raises(InvalidDemoStorgeURI):
+ resolver(
+ "demo:"
+ "(file:///tmp/blah)/"
+ "(file:///tmp/qux?pool_size=1234)"
+ )
+
+
+def test_demo_resolver_invoke_factory_w_fs_overlay(tmpdir):
+ fs_dir = pathlib.Path(tmpdir)
+ base_path = fs_dir / "base.fs"
+ changes_path = fs_dir / "changes.fs"
+ assert not base_path.exists()
+ assert not changes_path.exists()
+ demo_uri = (
+ f"demo:"
+ f"(file://{base_path})/"
+ f"(file://{changes_path}"
+ f"?quota=200)"
+ )
+
+ resolver = _demo_resolver()
+
+ factory, dbkw = resolver(demo_uri)
+
+ assert dbkw == {}
+
+ with contextlib.closing(factory()) as demo:
+ assert isinstance(demo, DemoStorage)
+ assert isinstance(demo.base, FileStorage)
+ assert isinstance(demo.changes, FileStorage)
+ assert base_path.exists()
+ assert changes_path.exists()
+ assert demo.changes._quota == 200
+
+
+def test_demo_resolver_invoke_factory_w_qs_parms(tmpdir):
+ fs_dir = pathlib.Path(tmpdir)
+ base_path = fs_dir / "base.fs"
+ changes_path = fs_dir / "changes.fs"
+ assert not base_path.exists()
+ assert not changes_path.exists()
+ demo_uri = "demo:(memory://111)/(memory://222)#foo=bar&abc=def"
+
+ resolver = _demo_resolver()
+
+ factory, dbkw = resolver(demo_uri)
+
+ assert dbkw == {'foo': 'bar', 'abc': 'def'}
+
+ with contextlib.closing(factory()) as demo:
+ assert isinstance(demo, DemoStorage)
+ assert isinstance(demo.base, MappingStorage)
+ demo.base.__name__ == '111'
+ assert isinstance(demo.changes, MappingStorage)
+ assert demo.changes.__name__ == '222'
+
+
+def test_entry_points():
+ from zodburi import resolvers
+
+ our_eps = {
+ ep.name: ep for ep in distribution("zodburi").entry_points
+ }
+ expected = [
+ ('memory', resolvers.MappingStorageURIResolver),
+ ('zeo', resolvers.ClientStorageURIResolver),
+ ('file', resolvers.FileStorageURIResolver),
+ ('zconfig', resolvers.ZConfigURIResolver),
+ ('demo', resolvers.DemoStorageURIResolver),
+ ]
+ for name, cls in expected:
+ target = our_eps[name].load()
+ assert isinstance(target, cls)