diff --git a/docs/util.rst b/docs/util.rst index 71606a6c..0e75c824 100644 --- a/docs/util.rst +++ b/docs/util.rst @@ -181,6 +181,82 @@ Documentation :inherited-members: +Cyclic Counter Values +------------------------------ +There are classes provided to produce and to track cyclic counter values + +Ring Counter +^^^^^^^^^^^^^^^^^^ +Counter which can increase / decrease and will wrap around when reaching the maximum / minimum value. + +.. exec_code:: + + from HABApp.util import RingCounter + + # Ring counter that allows 11 values (0..10) + RingCounter(10) + # Same as + RingCounter(0, 10) + + c = RingCounter(2, 5, initial_value=2) + for _ in range(4): + c.increase() # increase by 1 + print(c.value) # get the value through the property + for _ in range(4): + c += 1 # increase by 1 + print(int(c)) # casting to int returns the current value + + # Compare works out of the box + print(f'== 2: {c == 2}') + print(f'>= 2: {c >= 2}') + + +Ring Counter Tracker +^^^^^^^^^^^^^^^^^^^^ + +Tracke which tracks a ring counter value and only allows increasing / decreasing values + +.. exec_code:: + hide-output + + from HABApp.util import RingCounterTracker + + # Tracker that allows 101 values (0..100) with a 10 value ignore region + RingCounterTracker(100) + # Same as + c = RingCounterTracker(0, 100) + + assert c.allow(50) # First value is always allowed + assert not c.allow(50) # Same value again is not allowed since it's not increasing + assert not c.allow(41) # Value in the ignore region is not allowed + assert c.test_allow(40) # Value out of the ignore region is allowed + + assert c.allow(100) + assert c.allow(5) # Value is allowed since it wraps around and is increasing + assert not c.allow(100) # Ignore interval wraps properly around, too + assert not c.allow(97) + assert c.allow(96) # Highest value out of the ignore interval is allowed again + + # Compare works out of the box + print(f'== 5: {c == 5}') + print(f'>= 5: {c >= 5}') + + # Last accepted value + print(f'Last value: {c.value:d}') + + +Documentation +^^^^^^^^^^^^^^^^^^ + +.. autoclass:: HABApp.util.RingCounter + :members: + :inherited-members: + +.. autoclass:: HABApp.util.RingCounterTracker + :members: + :inherited-members: + + Statistics ------------------------------ diff --git a/src/HABApp/util/__init__.py b/src/HABApp/util/__init__.py index 8b56fd5a..c678b056 100644 --- a/src/HABApp/util/__init__.py +++ b/src/HABApp/util/__init__.py @@ -1,8 +1,7 @@ -from .threshold import Threshold -from .statistics import Statistics -from .listener_groups import EventListenerGroup -from .fade import Fade -from .rate_limiter import RateLimiter - -from . import functions -from . import multimode +from HABApp.util import functions, multimode +from HABApp.util.fade import Fade +from HABApp.util.listener_groups import EventListenerGroup +from HABApp.util.rate_limiter import RateLimiter +from HABApp.util.ring_counter import RingCounter, RingCounterTracker +from HABApp.util.statistics import Statistics +from HABApp.util.threshold import Threshold diff --git a/src/HABApp/util/ring_counter/__init__.py b/src/HABApp/util/ring_counter/__init__.py new file mode 100644 index 00000000..fa0985b8 --- /dev/null +++ b/src/HABApp/util/ring_counter/__init__.py @@ -0,0 +1,2 @@ +from .ring_counter import RingCounter +from .ring_counter_tracker import RingCounterTracker diff --git a/src/HABApp/util/ring_counter/ring_counter.py b/src/HABApp/util/ring_counter/ring_counter.py new file mode 100644 index 00000000..98d563f3 --- /dev/null +++ b/src/HABApp/util/ring_counter/ring_counter.py @@ -0,0 +1,109 @@ +from typing import Final + +from typing_extensions import Self + + +class RingCounter: + """A ring counter is a counter that wraps around when it reaches its maximum value. + """ + + __slots__ = ('_max_value', '_min_value', '_value') + + def __init__(self, min_value: int | None = None, max_value: int | None = None, *, + initial_value: int | None = None) -> None: + if max_value is None: + max_value = min_value + min_value = 0 + + if initial_value is None: + initial_value = min_value + + if not isinstance(min_value, int): + msg = 'Min value must be an integer' + raise TypeError(msg) + if not isinstance(max_value, int): + msg = 'Max value must be an integer' + raise TypeError(msg) + if not isinstance(initial_value, int): + msg = 'Initial value must be an integer' + raise TypeError(msg) + + if not min_value < max_value: + msg = f'Min value {min_value} must be less than max value {max_value}' + raise ValueError(msg) + if not min_value <= initial_value <= max_value: + msg = f'Initial value {initial_value} is not in range [{min_value:d}..{max_value:d}]' + raise ValueError(msg) + + self._min_value: Final = min_value + self._max_value: Final = max_value + self._value: int = initial_value + + @property + def size(self) -> int: + """Return the size (how man values it can count) of the ring counter.""" + return self._max_value - self._min_value + 1 + + def __len__(self) -> int: + return self.size + + @property + def value(self) -> int: + """Current value of the ring counter.""" + return self._value + + def __int__(self) -> int: + return self._value + + def increase(self, value: int = 1) -> Self: + """Increase the value of the ring counter by the given value.""" + if value < 0: + msg = 'Value must be >= 0' + raise ValueError(msg) + + self._value += value + + while self._value > self._max_value: + self._value -= self.size + + return self + + def decrease(self, value: int = 1) -> Self: + """Decrease the value of the ring counter by the given value.""" + if value < 0: + msg = 'Value must be >= 0' + raise ValueError(msg) + + self._value -= value + + while self._value < self._min_value: + self._value += self.size + + return self + + def __iadd__(self, other: int) -> Self: + return self.increase(other) + + def __isub__(self, other: int) -> Self: + return self.decrease(other) + + def __repr__(self) -> str: + return f'{self.__class__.__name__}(value={self._value:d}, min={self._min_value:d} max={self._max_value:d})' + + def __eq__(self, other: int) -> bool: + return self._value == other + + def __ne__(self, other: float) -> bool: + return self._value != other + + def __ge__(self, other: float) -> bool: + return self._value >= other + + def __gt__(self, other: float) -> bool: + return self._value > other + + def __le__(self, other: float) -> bool: + return self._value <= other + + def __lt__(self, other: float) -> bool: + return self._value < other diff --git a/src/HABApp/util/ring_counter/ring_counter_tracker.py b/src/HABApp/util/ring_counter/ring_counter_tracker.py new file mode 100644 index 00000000..d731b182 --- /dev/null +++ b/src/HABApp/util/ring_counter/ring_counter_tracker.py @@ -0,0 +1,132 @@ +from typing import Final, Literal + + +class RingCounterTracker: + """Class that tracks a ring counter value and only allows increasing values. + """ + + __slots__ = ('_ignore', '_is_increasing', '_last_value', '_max_value', '_min_value') + + def __init__(self, min_value: int | None = None, max_value: int | None = None, *, + ignore: int = 10, direction: Literal['increasing', 'decreasing'] = 'increasing') -> None: + """ + + :param min_value: Minimum value of the ring counter + :param max_value: Maximum value of the ring counter + :param ignore: How many values to ignore before allowing a lower value + :param direction: Direction of the counter: increasing or decreasing + """ + if max_value is None: + max_value = min_value + min_value = 0 + + if not isinstance(min_value, int): + msg = 'Min value must be an integer' + raise TypeError(msg) + if not isinstance(max_value, int): + msg = 'Max value must be an integer' + raise TypeError(msg) + if not isinstance(ignore, int): + msg = 'Ignore value must be an integer' + raise TypeError(msg) + + if not min_value < max_value: + msg = f'Min value {min_value} must be less than max value {max_value}' + raise ValueError(msg) + if not 0 < ignore < (max_value - min_value + 1): + msg = f'Ignore value {ignore} must be greater than 0 and lower than max value {max_value}' + raise ValueError(msg) + + self._min_value: Final = min_value + self._max_value: Final = max_value + self._ignore: Final = ignore + self._is_increasing: Final = direction != 'decreasing' + + # # initialize so we always allow on first call + self._last_value: int = (max_value + ignore + 2) if self._is_increasing else (min_value - ignore - 2) + + @property + def value(self) -> int: + """Get the last value of the ring counter.""" + return self._last_value + + def __int__(self) -> int: + return self._last_value + + def allow(self, value: int, *, strict: bool = True, set_value: bool = True) -> bool: + """Return if a value is allowed and set it as the current value if it was allowed. + + :param value: Value to be checked + :param strict: Check if the value is within min/max and of correct type + :param set_value: Set the new value as the current value if it was allowed + :return: True if the value was allowed, False if not + """ + if strict and not isinstance(value, int): + msg = f'Value must be an integer (is {type(value)})' + raise TypeError(msg) + + if value > (max_value := self._max_value): + if strict: + msg = f'Value {value} is greater than max value {self._max_value}' + raise ValueError(msg) + return False + + if value < (min_value := self._min_value): + if strict: + msg = f'Value {value} is less than min value {self._min_value}' + raise ValueError(msg) + return False + + if self._is_increasing: + lowest_ignored = (last_value := self._last_value) - self._ignore + 1 + if lowest_ignored <= value <= last_value: + return False + if lowest_ignored < min_value: + lowest_ignored += (max_value - min_value + 1) + if lowest_ignored <= value <= max_value: + return False + else: + highest_ignored = (last_value := self._last_value) + self._ignore - 1 + if last_value <= value <= highest_ignored: + return False + if highest_ignored > max_value: + highest_ignored -= (max_value - min_value + 1) + if min_value <= value <= highest_ignored: + return False + + if set_value: + self._last_value = value + return True + + def test_allow(self, value: int) -> bool: + """Test if a value will be allowed without setting it as the current value. + + :param value: value to test + :return: True if the value would be allowed, False if not + """ + return self.allow(value, strict=False, set_value=False) + + def __repr__(self) -> str: + value = self._last_value + if value > self._max_value or value < self._min_value: + value = '-' + return (f'{self.__class__.__name__:s}(value={value}, ' + f'min={self._min_value:d}, max={self._max_value:d}, ignore={self._ignore:d})') + + def __eq__(self, other: int) -> bool: + return self._last_value == other + + def __ne__(self, other: float) -> bool: + return self._last_value != other + + def __ge__(self, other: float) -> bool: + return self._last_value >= other + + def __gt__(self, other: float) -> bool: + return self._last_value > other + + def __le__(self, other: float) -> bool: + return self._last_value <= other + + def __lt__(self, other: float) -> bool: + return self._last_value < other diff --git a/tests/test_utils/test_ring_counter.py b/tests/test_utils/test_ring_counter.py new file mode 100644 index 00000000..2230b92b --- /dev/null +++ b/tests/test_utils/test_ring_counter.py @@ -0,0 +1,155 @@ +import pytest + +from HABApp.util.ring_counter import RingCounter, RingCounterTracker + + +def test_ring_counter(): + c = RingCounter(10) + assert c == 0 + assert c.size == 11 + assert len(c) == 11 + + c.increase(10) + assert c == 10 + assert c.value == 10 + assert int(c) == 10 + + c.increase(2) + assert c == 1 + + c.increase(12) + assert c == 2 + + c = RingCounter(2, 10, initial_value=9) + assert c == 9 + + c.increase(2) + assert c == 2 + + c.increase(18) + assert c == 2 + + c += 1 + assert c == 3 + + c -= 4 + assert c == 8 + + +def test_ring_counter_compare(): + c = RingCounter(10, initial_value=5) + assert c == 5 + + assert c > 4.9 + assert c >= 5 + assert c >= 4.9999 + assert c < 6 + assert c <= 5.001 + assert c != 6 + + +def test_repr(): + assert str(RingCounter(10)) == 'RingCounter(value=0, min=0 max=10)' + assert str(RingCounter(1, 3, initial_value=2)) == 'RingCounter(value=2, min=1 max=3)' + + c = RingCounterTracker(100) + assert str(c) == 'RingCounterTracker(value=-, min=0, max=100, ignore=10)' + c.allow(10) + assert str(c) == 'RingCounterTracker(value=10, min=0, max=100, ignore=10)' + + +@pytest.mark.parametrize('direction', ('something', 'decreasing')) +def test_ring_counter_init(direction): + assert RingCounterTracker(100, direction=direction).allow(0) + assert RingCounterTracker(100, direction=direction).allow(1) + assert RingCounterTracker(100, direction=direction).allow(99) + assert RingCounterTracker(100, direction=direction).allow(100) + + +def test_ring_counter_tracker_increase(): + tracker = RingCounterTracker(100) + + ctr = RingCounter(100, initial_value=10) + c_false = RingCounter(100, initial_value=1) + c_true = RingCounter(100, initial_value=0) + + values = set() + for _ in range(1_000): + assert tracker.allow(ctr.value) + assert not tracker.allow(c_false.value) + assert tracker.test_allow(c_true.value) + for c in (ctr, c_false, c_true): + c.increase() + values.add(tracker.value) + + assert values == set(range(101)) + + # Test with lower boundary + tracker = RingCounterTracker(-10, 10) + + ctr = RingCounter(-10, 10, initial_value=10) + c_false = RingCounter(-10, 10, initial_value=1) + c_true = RingCounter(-10, 10, initial_value=0) + + values = set() + for _ in range(1_000): + assert tracker.allow(ctr.value) + assert not tracker.allow(c_false.value) + assert tracker.test_allow(c_true.value) + for c in (ctr, c_false, c_true): + c.increase() + values.add(tracker.value) + + assert values == set(range(-10, 11)) + + + + +def test_ring_counter_tracker_decrease(): + tracker = RingCounterTracker(100, direction='decreasing') + + ctr = RingCounter(100, initial_value=20) + c_false = RingCounter(100, initial_value=29) + c_true = RingCounter(100, initial_value=30) + + values = set() + for _ in range(1_000): + assert tracker.allow(ctr.value) + assert not tracker.allow(c_false.value) + assert tracker.test_allow(c_true.value) + for c in (ctr, c_false, c_true): + c.decrease() + values.add(tracker.value) + + assert values == set(range(101)) + + # Test with lower boundary + tracker = RingCounterTracker(20, 40, direction='decreasing') + + ctr = RingCounter(20, 40, initial_value=20) + c_false = RingCounter(20, 40, initial_value=29) + c_true = RingCounter(20, 40, initial_value=30) + + values = set() + for _ in range(1_000): + assert tracker.allow(ctr.value) + assert not tracker.allow(c_false.value) + assert tracker.test_allow(c_true.value) + for c in (ctr, c_false, c_true): + c.decrease() + values.add(tracker.value) + + assert values == set(range(20, 41)) + + +def test_ring_counter_tracker_compare(): + c = RingCounterTracker(100) + c.allow(5) + assert c == 5 + + assert c > 4.9 + assert c >= 5 + assert c >= 4.9999 + assert c < 6 + assert c <= 5.001 + assert c != 6