Skip to content

Commit

Permalink
refactor: habitability, pytest idioms
Browse files Browse the repository at this point in the history
  • Loading branch information
tseaver committed May 4, 2024
1 parent 02eb413 commit efd44fb
Show file tree
Hide file tree
Showing 5 changed files with 907 additions and 771 deletions.
34 changes: 21 additions & 13 deletions zodburi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,41 @@
)

BYTES_PARAMETERS = (
'cache_size_bytes',
'historical_cache_size_bytes',
'large_record_size'
"cache_size_bytes",
"historical_cache_size_bytes",
"large_record_size"
)

PARAMETERS = dict(
[('database_name', 'database_name')] +
[(f'connection_{parm}', parm) for parm in CONNECTION_PARAMETERS]
[("database_name", "database_name")] +
[(f"connection_{parm}", parm) for parm in CONNECTION_PARAMETERS]
)

HAS_UNITS_RE = re.compile(r'\s*(\d+)\s*([kmg])b\s*$')
HAS_UNITS_RE = re.compile(r"\s*(\d+)\s*([kmg])b\s*$")
UNITS = dict(k=1<<10, m=1<<20, g=1<<30)


_DEFAULT_DBKW = {
'cache_size': 10000,
'pool_size': 7,
'database_name': 'unnamed',
"cache_size": 10000,
"pool_size": 7,
"database_name": "unnamed",
}


class NoResolverForScheme(KeyError):
def __init__(self, uri):
self.uri = uri
super().__init__(f'No resolver found for uri: {uri}')
super().__init__(f"No resolver found for uri: {uri}")


class UnknownDatabaseKeywords(KeyError):
def __init__(self, kw):
self.kw = kw
super().__init__(
f'Unrecognized database keyword(s): {", ".join(kw)}'
f"Unrecognized database keyword(s): {", ".join(kw)}"
)


def resolve_uri(uri):
"""
Returns a tuple, (factory, dbkw) where factory is a no-arg callable which
Expand All @@ -54,9 +58,10 @@ def resolve_uri(uri):
factory, dbkw = _resolve_uri(uri)
return factory, _get_dbkw(dbkw)


# _resolve_uri serves resolve_uri: it returns factory and original raw dbkw.
def _resolve_uri(uri):
scheme = uri[:uri.find(':')]
scheme = uri[:uri.find(":")]
try:
resolver_eps = entry_points(group="zodburi.resolvers")
except TypeError: # pragma: NO COVER Python < 3.10
Expand All @@ -70,21 +75,24 @@ def _resolve_uri(uri):
else:
raise NoResolverForScheme(uri)


def _parse_bytes(s):
m = HAS_UNITS_RE.match(s.lower())

if m:
v, uname = m.group(1, 2)
return int(v) * UNITS[uname]
else:
return int(s)


def _get_dbkw(kw):
dbkw = _DEFAULT_DBKW.copy()

for parameter in PARAMETERS:
if parameter in kw:
v = kw.pop(parameter)
if parameter.startswith('connection_'):
if parameter.startswith("connection_"):
if not isinstance(v, int):
if PARAMETERS[parameter] in BYTES_PARAMETERS:
v = _parse_bytes(v)
Expand Down
68 changes: 46 additions & 22 deletions zodburi/datatypes.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,67 @@
TRUETYPES = ('1', 'on', 'true', 't', 'yes')
FALSETYPES = ('', '0', 'off', 'false', 'f', 'no')


class SuffixLengthMismatch(ValueError):
def __init__(self, d):
self.d = d
super().__init__("All suffix keys must have the same length")


class SuffixMultiplier:
# d is a dictionary of suffixes to integer multipliers. If no suffixes
# match, default is the multiplier. Matches are case insensitive. Return
# values are in the fundamental unit.
"""Convert integer-like strings w/ size suffixes to integers
- 'd' is a dictionary of suffixes to integer multipliers.
- 'default' is the multiplier if no suffixes match.
Matches are case insensitive.
Returned values are in the fundamental unit.
"""
def __init__(self, d, default=1):
self._d = d
self._default = default
# all keys must be the same size
self._keysz = None
for k in d.keys():
if self._keysz is None:
self._keysz = len(k)
else:
if self._keysz != len(k):
raise ValueError('suffix length missmatch')
sizes = set(len(key) for key in d)

if len(sizes) > 1:
raise SuffixLengthMismatch(d)

self._d = {key.lower(): value for key, value in d.items()}
self._default = default
self._keysz = sizes.pop() if sizes else 0

def __call__(self, v):
v = v.lower()
for s, m in self._d.items():
if v[-self._keysz:] == s:
return int(v[:-self._keysz]) * m
return int(v) * self._default
if self._keysz and len(v) > self._keysz:
v = v.lower()
suffix = v[-self._keysz:]
multiplier = self._d.get(suffix, self._default)

if multiplier is not self._default:
v = v[:-self._keysz]
else:
multiplier = self._default

return int(v) * multiplier


convert_bytesize = SuffixMultiplier({'kb': 1024,
'mb': 1024*1024,
'gb': 1024*1024*1024,
})
convert_bytesize = SuffixMultiplier({
"kb": 1024,
"mb": 1024*1024,
"gb": 1024*1024*1024,
})


def convert_int(value):
# boolean values are also treated as integers
value = value.lower()

if value in FALSETYPES:
return 0

if value in TRUETYPES:
return 1

return int(value)


def convert_tuple(value):
return tuple(value.split(','))
return tuple(value.split(","))
2 changes: 1 addition & 1 deletion zodburi/tests/test___init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import pytest

import zodburi
from zodburi import resolvers


@pytest.mark.parametrize("source, expected", [
Expand All @@ -24,6 +23,7 @@ def _expected_dbkw(**kw):
dbkw.update(kw)
return dbkw


@pytest.mark.parametrize("kw, expected", [
({}, _expected_dbkw()),
({"database_name": "foo"}, _expected_dbkw(database_name="foo")),
Expand Down
170 changes: 97 additions & 73 deletions zodburi/tests/test_datatypes.py
Original file line number Diff line number Diff line change
@@ -1,102 +1,126 @@
import unittest

import pytest

_marker = object()
class SuffixMultiplierTests(unittest.TestCase):

def _getTargetClass(self):
from zodburi.datatypes import SuffixMultiplier
return SuffixMultiplier

def _makeOne(self, d=None, default=_marker):
if d is None:
d = {}
if default is _marker:
return self._getTargetClass()(d)
return self._getTargetClass()(d, default)
def _suffix_multiplier(d=None, default=_marker):
from zodburi.datatypes import SuffixMultiplier

if d is None:
d = {}

if default is _marker:
return SuffixMultiplier(d)

return SuffixMultiplier(d, default)


def test_suffixmultiplier___init___w_defaults():
sm = _suffix_multiplier()
assert sm._d == {}
assert sm._default == 1
assert sm._keysz == 0


def test_suffixmultiplier___init___w_explicit_default():
sm = _suffix_multiplier(default=3)
assert sm._d == {}
assert sm._default == 3
assert sm._keysz == 0


def test_suffixmultiplier___init___w_normal_suffixes():
SFX = {"aaa": 2, "bbb": 3}
sm = _suffix_multiplier(SFX)
assert sm._d == SFX
assert sm._default == 1
assert sm._keysz == 3


def test_suffixmultiplier___init___w_mismatched_suffixes():
SFX = {"aaa": 2, "bbbb": 3}

with pytest.raises(ValueError):
_suffix_multiplier(SFX)


def test_suffixmultiplier___call____miss():
SFX = {"aaa": 2, "bbb": 3}
sm = _suffix_multiplier(SFX)
assert sm("14") == 14


def test_suffixmultiplier___call___hit():
SFX = {"aaa": 2, "bbb": 3}
sm = _suffix_multiplier(SFX)
assert sm("14bbb") == 42


def test_convert_bytesize_miss():
from zodburi.datatypes import convert_bytesize

assert convert_bytesize("14") == 14

def test_ctor_simple(self):
sm = self._makeOne()
self.assertEqual(sm._d, {})
self.assertEqual(sm._default, 1)
self.assertEqual(sm._keysz, None)

def test_ctor_w_explicit_default(self):
sm = self._makeOne(default=3)
self.assertEqual(sm._default, 3)
@pytest.mark.parametrize("sized, expected", [
("14", 14),
("200", 200),
("14kb", 14 * 1024),
("14mb", 14 * 1024 * 1024),
("14gb", 14 * 1024 * 1024 * 1024),
])
def test_convert_bytesize_hit(sized, expected):
from zodburi.datatypes import convert_bytesize

def test_ctor_w_normal_suffixes(self):
SFX = {'aaa': 2, 'bbb': 3}
sm = self._makeOne(SFX)
self.assertEqual(sm._d, SFX)
self.assertEqual(sm._default, 1)
self.assertEqual(sm._keysz, 3)
assert convert_bytesize(sized) == expected

def test_ctor_w_mismatched_suffixes(self):
SFX = {'aaa': 2, 'bbbb': 3}
self.assertRaises(ValueError, self._makeOne, SFX)

def test___call____miss(self):
SFX = {'aaa': 2, 'bbb': 3}
sm = self._makeOne(SFX)
self.assertEqual(sm('14'), 14)
def test_convert_int_w_falsetypes():
from zodburi.datatypes import convert_int
from zodburi.datatypes import FALSETYPES

def test___call____hit(self):
SFX = {'aaa': 2, 'bbb': 3}
sm = self._makeOne(SFX)
self.assertEqual(sm('14bbb'), 42)
for v in FALSETYPES:
assert convert_int(v) == 0
assert convert_int(v.title()) == 0


class Test_convert_bytesize(unittest.TestCase):
def test_convert_int_w_truetypes():
from zodburi.datatypes import convert_int
from zodburi.datatypes import TRUETYPES

def _callFUT(self, value):
from zodburi.datatypes import convert_bytesize
return convert_bytesize(value)
for v in TRUETYPES:
assert convert_int(v) == 1
assert convert_int(v.title()) == 1

def test_hit(self):
self.assertEqual(self._callFUT('14kb'), 14 * 1024)
self.assertEqual(self._callFUT('14mb'), 14 * 1024 * 1024)
self.assertEqual(self._callFUT('14gb'), 14 * 1024 * 1024 * 1024)

def test_miss(self):
self.assertEqual(self._callFUT('14'), 14)
def test_convert_int_w_normal():
from zodburi.datatypes import convert_int

assert convert_int("14") == 14

class Test_convert_int(unittest.TestCase):

def _callFUT(self, value):
from zodburi.datatypes import convert_int
return convert_int(value)
def test_convert_int_w_invalid():
from zodburi.datatypes import convert_int

def test_hit_falsetypes(self):
from zodburi.datatypes import FALSETYPES
for v in FALSETYPES:
self.assertEqual(self._callFUT(v), 0)
self.assertEqual(self._callFUT(v.title()), 0)
with pytest.raises(ValueError):
convert_int("notanint")

def test_hit_truetypes(self):
from zodburi.datatypes import TRUETYPES
for v in TRUETYPES:
self.assertEqual(self._callFUT(v), 1)
self.assertEqual(self._callFUT(v.title()), 1)

def test_hit_normal(self):
self.assertEqual(self._callFUT('14'), 14)
def test_convert_tuple_w_empty():
from zodburi.datatypes import convert_tuple

def test_miss(self):
self.assertRaises(ValueError, self._callFUT, 'notanint')
assert convert_tuple("") == ("",)


class Test_convert_tuple(unittest.TestCase):
def test_convert_tuple_wo_commas():
from zodburi.datatypes import convert_tuple

def _callFUT(self, value):
from zodburi.datatypes import convert_tuple
return convert_tuple(value)
assert convert_tuple("abc") == ("abc",)

def test_empty(self):
self.assertEqual(self._callFUT(''), ('',))

def test_wo_commas(self):
self.assertEqual(self._callFUT('abc'), ('abc',))
def test_convert_tuple_w_commas():
from zodburi.datatypes import convert_tuple

def test_w_commas(self):
self.assertEqual(self._callFUT('abc,def'), ('abc', 'def'))
assert convert_tuple("abc,def") == ("abc", "def")
Loading

0 comments on commit efd44fb

Please sign in to comment.