Skip to content

Commit

Permalink
Fixed habapp tests
Browse files Browse the repository at this point in the history
  • Loading branch information
spacemanspiff2007 committed Dec 5, 2024
1 parent 38e8905 commit 681cef8
Show file tree
Hide file tree
Showing 27 changed files with 717 additions and 535 deletions.
5 changes: 5 additions & 0 deletions .ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,8 @@ lines-after-imports = 2 # https://docs.astral.sh/ruff/settings/#lint_isort_l
"interface_*.py" = [
"F401" # F401 [*] {name} imported but unused
]


"run/*" = [
"S101" # Use of assert detected
]
12 changes: 7 additions & 5 deletions run/conf_testing/lib/HABAppTests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from .utils import get_random_name, run_coro, find_astro_sun_thing, get_bytes_text
from .test_data import get_openhab_test_events, get_openhab_test_states, get_openhab_test_types
from .utils import find_astro_sun_thing, get_random_name


# isort: split

from .test_rule import TestBaseRule, TestResult
from .event_waiter import EventWaiter
from .item_waiter import ItemWaiter
from .openhab_tmp_item import OpenhabTmpItem

from .test_data import get_openhab_test_events, get_openhab_test_states, get_openhab_test_types
from .openhab_tmp_item import AsyncOpenhabTmpItem, OpenhabTmpItem
from .test_rule import TestBaseRule, TestResult, TestRunnerRule
15 changes: 11 additions & 4 deletions run/conf_testing/lib/HABAppTests/compare_values.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from .utils import get_bytes_text
from binascii import b2a_hex


def get_equal_text(value1, value2) -> str:
return f'{get_value_text(value1)} {"==" if value1 == value2 else "!="} {get_value_text(value2)}'
def get_bytes_text(value: object) -> object:
if isinstance(value, bytes) and len(value) > 300:
return b2a_hex(value[:40]).decode() + ' ... ' + b2a_hex(value[-40:]).decode()
return value


def get_value_text(value) -> str:
def get_equal_text(value1: object, value2: object) -> str:
equal = value1 == value2 and isinstance(value1, value2.__class__)
return f'{get_value_text(value1):s} {"==" if equal else "!="} {get_value_text(value2):s}'


def get_value_text(value: object) -> str:
return f'{get_bytes_text(value)} ({str(type(value))[8:-2]})'
7 changes: 1 addition & 6 deletions run/conf_testing/lib/HABAppTests/errors.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
class TestCaseFailed(Exception):
def __init__(self, msg: str) -> None:
self.msg = msg


class TestCaseWarning(Exception):
class TestCaseFailed(Exception): # noqa: N818
def __init__(self, msg: str) -> None:
self.msg = msg
79 changes: 52 additions & 27 deletions run/conf_testing/lib/HABAppTests/event_waiter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import asyncio
import logging
import time
from collections.abc import Generator
from time import monotonic
from types import TracebackType
from typing import Any, TypeVar

Expand All @@ -18,8 +21,6 @@

log = logging.getLogger('HABApp.Tests')

EVENT_TYPE = TypeVar('EVENT_TYPE')


class EventWaiter:
def __init__(self, name: BaseValueItem | str,
Expand All @@ -29,61 +30,85 @@ def __init__(self, name: BaseValueItem | str,
assert isinstance(name, str)
assert isinstance(event_filter, EventFilterBase)

self.name = name
self.event_filter = event_filter
self.timeout = timeout
self._name = name
self._event_filter = event_filter
self._timeout = timeout

self.event_listener = EventBusListener(
self.name,
self._event_listener = EventBusListener(
self._name,
wrap_func(self.__process_event),
self.event_filter
self._event_filter
)

self._received_events = []

def __process_event(self, event) -> None:
if isinstance(self.event_filter, EventFilter):
assert isinstance(event, self.event_filter.event_class)
if isinstance(self._event_filter, EventFilter):
assert isinstance(event, self._event_filter.event_class)
self._received_events.append(event)

def clear(self) -> None:
self._received_events.clear()

def wait_for_event(self, **kwargs) -> EVENT_TYPE:

start = time.time()

while True:
time.sleep(0.02)
def _check_wait_event(self, attribs: dict[str, Any]) -> Generator[float, Any, Any]:
start = monotonic()
end = start + self._timeout

if time.time() > start + self.timeout:
expected_values = 'with ' + ', '.join([f'{__k}={__v}' for __k, __v in kwargs.items()]) if kwargs else ''
msg = f'Timeout while waiting for {self.event_filter.describe()} for {self.name} {expected_values}'
raise TestCaseFailed(msg)
while monotonic() < end:
yield 0.01

if not self._received_events:
continue

event = self._received_events.pop()

if kwargs:
if self.compare_event_value(event, kwargs):
if attribs:
if self.compare_event_value(event, attribs):
return event
continue

return event

raise ValueError()
expected_values = 'with ' + ', '.join([f'{__k}={__v}' for __k, __v in attribs.items()]) if attribs else ''
msg = f'Timeout while waiting for {self._event_filter.describe()} for {self._name} {expected_values}'
raise TestCaseFailed(msg)

def wait_for_event(self, **kwargs: Any) -> Any:
gen = self._check_wait_event(kwargs)
try:
while True:
delay = next(gen)
time.sleep(delay)
except StopIteration as e:
event = e.value

if event is None:
raise ValueError()
return event

async def async_wait_for_event(self, **kwargs: Any) -> Any:
gen = self._check_wait_event(kwargs)
try:
while True:
delay = next(gen)
await asyncio.sleep(delay)
except StopIteration as e:
event = e.value

if event is None:
raise ValueError()
return event

def __enter__(self) -> 'EventWaiter':
get_current_context().add_event_listener(self.event_listener)
get_current_context().add_event_listener(self._event_listener)
return self

def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> bool:
get_current_context().remove_event_listener(self.event_listener)
def __exit__(self, exc_type: type[BaseException] | None,
exc_val: BaseException | None, exc_tb: TracebackType | None) -> bool:
get_current_context().remove_event_listener(self._event_listener)

@staticmethod
def compare_event_value(event, kwargs: dict[str, Any]):
def compare_event_value(event: Any, kwargs: dict[str, Any]) -> bool:
only_value = 'value' in kwargs and len(kwargs) == 1
val_msg = []

Expand Down
62 changes: 38 additions & 24 deletions run/conf_testing/lib/HABAppTests/item_waiter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import asyncio
import logging
import time
from collections.abc import Generator
from time import monotonic
from types import TracebackType
from typing import Any, Final

from HABApp.core.items import BaseValueItem
from HABAppTests.compare_values import get_equal_text
Expand All @@ -11,38 +15,48 @@


class ItemWaiter:
def __init__(self, item, timeout=1) -> None:
self.item = item
assert isinstance(item, BaseValueItem), f'{item} is not an Item'
def __init__(self, item: str | BaseValueItem, timeout: float = 1) -> None:
self._item: Final = item if not isinstance(item, str) else BaseValueItem.get_item(item)
assert isinstance(self._item, BaseValueItem), f'{self._item} is not an Item'

self.timeout = timeout
self._timeout: Final = timeout

def wait_for_attribs(self, **kwargs) -> bool:
start = time.time()
end = start + self.timeout
def _check_attribs(self, attribs: dict[str, Any]) -> Generator[float, Any, None]:
start = monotonic()
end = start + self._timeout

while True:
time.sleep(0.01)
while monotonic() < end:
yield 0.01

for name, target in kwargs.items():
if getattr(self.item, name) != target:
for name, target in attribs.items():
if getattr(self._item, name) != target:
break
else:
return True

if time.time() > end:
indent = max(map(len, kwargs))
failed = [
f'{name:>{indent:d}s}: {get_equal_text(getattr(self.item, name), target)}'
for name, target in kwargs.items()
]
failed_msg = '\n'.join(failed)
msg = f'Timeout waiting for {self.item.name}!\n{failed_msg}'
raise TestCaseFailed(msg)

def wait_for_state(self, state=None):
return None

indent = max(map(len, attribs))
failed = [
f'{name:>{indent:d}s}: {get_equal_text(getattr(self._item, name), target)}'
for name, target in attribs.items()
]
failed_msg = '\n'.join(failed)
msg = f'Timeout waiting for {self._item.name}!\n{failed_msg}'
raise TestCaseFailed(msg)

def wait_for_attribs(self, **kwargs) -> None:
for delay in self._check_attribs(kwargs):
time.sleep(delay)

async def async_wait_for_attribs(self, **kwargs) -> None:
for delay in self._check_attribs(kwargs):
await asyncio.sleep(delay)

def wait_for_state(self, state=None) -> None:
return self.wait_for_attribs(value=state)

async def async_wait_for_state(self, state=None) -> None:
return await self.async_wait_for_attribs(value=state)

def __enter__(self) -> 'ItemWaiter':
return self

Expand Down
Loading

0 comments on commit 681cef8

Please sign in to comment.