Skip to content

Commit

Permalink
Wait for target value in wait_for_state_change_or_timeout (#1522)
Browse files Browse the repository at this point in the history
* Wait for target value in wait_for_state_change_or_timeout

* Add default for WAIT_FOR_CALLBACK

* Reduce API

* Update __init__.py

* Update __init__.py
  • Loading branch information
SukramJ authored Apr 21, 2024
1 parent dc11487 commit dffebbe
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 174 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Version 2024.4.10 (2024-04-21)

- Add wait_for_callback to collector
- Wait for target value in wait_for_state_change_or_timeout

# Version 2024.4.9 (2024-04-20)

Expand Down
75 changes: 59 additions & 16 deletions hahomematic/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from hahomematic import central as hmcu
from hahomematic.caches.dynamic import CommandCache, PingPongCache
from hahomematic.client.xml_rpc import XmlRpcProxy
from hahomematic.config import CALLBACK_WARN_INTERVAL, RECONNECT_WAIT
from hahomematic.config import CALLBACK_WARN_INTERVAL, RECONNECT_WAIT, WAIT_FOR_CALLBACK
from hahomematic.const import (
DATETIME_FORMAT_MILLIS,
DEFAULT_CUSTOM_ID,
Expand Down Expand Up @@ -429,7 +429,7 @@ async def _set_value(
channel_address: str,
parameter: str,
value: Any,
wait_for_callback: bool,
wait_for_callback: int | None,
rx_mode: str | None = None,
) -> set[ENTITY_KEY]:
"""Set single value on paramset VALUES."""
Expand All @@ -443,12 +443,17 @@ async def _set_value(
entity_keys = self._last_value_send_cache.add_set_value(
channel_address=channel_address, parameter=parameter, value=value
)
if wait_for_callback and (
if wait_for_callback is not None and (
device := self.central.get_device(
address=get_device_address(address=channel_address)
)
):
await wait_for_state_change_or_timeout(device=device, entity_keys=entity_keys)
await wait_for_state_change_or_timeout(
device=device,
entity_keys=entity_keys,
values={parameter: value},
wait_for_callback=wait_for_callback,
)
return entity_keys # noqa: TRY300
except BaseHomematicException as ex:
_LOGGER.warning(
Expand All @@ -467,7 +472,7 @@ async def set_value(
paramset_key: str,
parameter: str,
value: Any,
wait_for_callback: bool = False,
wait_for_callback: int | None = WAIT_FOR_CALLBACK,
rx_mode: str | None = None,
) -> set[ENTITY_KEY]:
"""Set single value on paramset VALUES."""
Expand Down Expand Up @@ -517,7 +522,7 @@ async def put_paramset(
channel_address: str,
paramset_key: str,
values: dict[str, Any],
wait_for_callback: bool = False,
wait_for_callback: int | None = WAIT_FOR_CALLBACK,
rx_mode: str | None = None,
) -> set[ENTITY_KEY]:
"""
Expand All @@ -534,14 +539,21 @@ async def put_paramset(
await self._proxy.putParamset(channel_address, paramset_key, values)
# store the send value in the last_value_send_cache
entity_keys = self._last_value_send_cache.add_put_paramset(
channel_address=channel_address, paramset_key=paramset_key, values=values
channel_address=channel_address,
paramset_key=paramset_key,
values=values,
)
if wait_for_callback and (
if wait_for_callback is not None and (
device := self.central.get_device(
address=get_device_address(address=channel_address)
)
):
await wait_for_state_change_or_timeout(device=device, entity_keys=entity_keys)
await wait_for_state_change_or_timeout(
device=device,
entity_keys=entity_keys,
values=values,
wait_for_callback=wait_for_callback,
)
return entity_keys # noqa: TRY300
except BaseHomematicException as ex:
_LOGGER.warning(
Expand Down Expand Up @@ -1120,41 +1132,72 @@ def get_client(interface_id: str) -> Client | None:
return None


@measure_execution_time
async def wait_for_state_change_or_timeout(
device: HmDevice, entity_keys: set[ENTITY_KEY], timeout: float = 30.0
device: HmDevice, entity_keys: set[ENTITY_KEY], values: dict[str, Any], wait_for_callback: int
) -> None:
"""Wait for an entity to change state."""
waits = [
_track_single_entity_state_change_or_timeout(
device=device, entity_key=entity_key, timeout=timeout
device=device,
entity_key=entity_key,
value=values.get(entity_key[1]),
wait_for_callback=wait_for_callback,
)
for entity_key in entity_keys
]
await asyncio.gather(*waits)


@measure_execution_time
async def _track_single_entity_state_change_or_timeout(
device: HmDevice, entity_key: ENTITY_KEY, timeout: float
device: HmDevice, entity_key: ENTITY_KEY, value: Any, wait_for_callback: int
) -> None:
"""Wait for an entity to change state."""
ev = asyncio.Event()

def _async_event_changed(*args: Any, **kwargs: Any) -> None:
ev.set()
_LOGGER.debug("Changed event %s", entity_key)
if entity:
_LOGGER.debug(
"TRACK_SINGLE_ENTITY_STATE_CHANGE_OR_TIMEOUT: Received event %s with value %s",
entity_key,
entity.value,
)
if _isclose(value, entity.value):
_LOGGER.debug(
"TRACK_SINGLE_ENTITY_STATE_CHANGE_OR_TIMEOUT: Finished event %s with value %s",
entity_key,
entity.value,
)
ev.set()

channel_address, parameter = entity_key
if entity := device.get_generic_entity(channel_address=channel_address, parameter=parameter):
if not entity.supports_events:
_LOGGER.debug(
"TRACK_SINGLE_ENTITY_STATE_CHANGE_OR_TIMEOUT: Entity supports no events %s",
entity_key,
)
return
unsub = entity.register_entity_updated_callback(
entity_updated_callback=_async_event_changed, custom_id=DEFAULT_CUSTOM_ID
)

try:
async with asyncio.timeout(timeout):
async with asyncio.timeout(wait_for_callback):
await ev.wait()
except TimeoutError:
pass
_LOGGER.debug(
"TRACK_SINGLE_ENTITY_STATE_CHANGE_OR_TIMEOUT: Timeout waiting for event %s with value %s",
entity_key,
entity.value,
)
finally:
unsub()


def _isclose(value1: Any, value2: Any) -> bool:
"""Check if the both values are close to each other."""
if isinstance(value1, float):
return bool(round(value1, 2) == round(value2, 2))
return bool(value1 == value2)
2 changes: 2 additions & 0 deletions hahomematic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
DEFAULT_PING_PONG_MISMATCH_COUNT_TTL,
DEFAULT_RECONNECT_WAIT,
DEFAULT_TIMEOUT,
DEFAULT_WAIT_FOR_CALLBACK,
)

CALLBACK_WARN_INTERVAL = DEFAULT_CONNECTION_CHECKER_INTERVAL * 40
Expand All @@ -20,3 +21,4 @@
PING_PONG_MISMATCH_COUNT_TTL = DEFAULT_PING_PONG_MISMATCH_COUNT_TTL
RECONNECT_WAIT = DEFAULT_RECONNECT_WAIT
TIMEOUT = DEFAULT_TIMEOUT
WAIT_FOR_CALLBACK = DEFAULT_WAIT_FOR_CALLBACK
1 change: 1 addition & 0 deletions hahomematic/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
DEFAULT_TIMEOUT: Final = 60 # default timeout for a connection
DEFAULT_TLS: Final = False
DEFAULT_VERIFY_TLS: Final = False
DEFAULT_WAIT_FOR_CALLBACK: Final[int | None] = None

REGA_SCRIPT_FETCH_ALL_DEVICE_DATA: Final = "fetch_all_device_data.fn"
REGA_SCRIPT_GET_SERIAL: Final = "get_serial.fn"
Expand Down
3 changes: 2 additions & 1 deletion hahomematic/performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
finally:
if is_enabled:
delta = (datetime.now() - start).total_seconds()
arg = str(args[0]) if len(args) > 0 else ""
_LOGGER.info(
"Execution of %s took %ss (%s)",
func.__name__,
delta,
str(args[0]),
arg,
)

@wraps(func)
Expand Down
11 changes: 8 additions & 3 deletions hahomematic/platforms/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from hahomematic import central as hmcu, client as hmcl, support as hms
from hahomematic.async_support import loop_check
from hahomematic.config import WAIT_FOR_CALLBACK
from hahomematic.const import (
CALLBACK_TYPE,
DEFAULT_CUSTOM_ID,
Expand Down Expand Up @@ -797,7 +798,7 @@ def add_entity(
self._paramsets[collector_order][entity.channel_address] = {}
self._paramsets[collector_order][entity.channel_address][entity.parameter] = value

async def send_data(self, wait_for_callback: bool) -> bool:
async def send_data(self, wait_for_callback: int | None) -> bool:
"""Send data to backend."""
for paramset_no in dict(sorted(self._paramsets.items())).values():
for channel_address, paramset in paramset_no.items():
Expand All @@ -821,7 +822,9 @@ async def send_data(self, wait_for_callback: bool) -> bool:
return True


def bind_collector(wait_for_callback: bool = False) -> Callable:
def bind_collector(
wait_for_callback: int | None = WAIT_FOR_CALLBACK,
) -> Callable:
"""Decorate function to automatically add collector if not set."""

def decorator_bind_collector(func: _CallableT) -> _CallableT:
Expand All @@ -843,7 +846,9 @@ async def wrapper_collector(*args: Any, **kwargs: Any) -> Any:
collector = CallParameterCollector(device=args[0].device)
kwargs[_COLLECTOR_ARGUMENT_NAME] = collector
return_value = await func(*args, **kwargs)
await collector.send_data(wait_for_callback=wait_for_callback)
await collector.send_data(
wait_for_callback=wait_for_callback,
)
return return_value

return wrapper_collector # type: ignore[return-value]
Expand Down
5 changes: 3 additions & 2 deletions hahomematic_support/client_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import orjson

from hahomematic.client import _LOGGER, Client, _ClientConfig
from hahomematic.config import WAIT_FOR_CALLBACK
from hahomematic.const import (
DEFAULT_ENCODING,
ENTITY_KEY,
Expand Down Expand Up @@ -196,7 +197,7 @@ async def set_value(
paramset_key: str,
parameter: str,
value: Any,
wait_for_callback: bool = False,
wait_for_callback: int | None = WAIT_FOR_CALLBACK,
rx_mode: str | None = None,
) -> set[ENTITY_KEY]:
"""Set single value on paramset VALUES."""
Expand Down Expand Up @@ -250,7 +251,7 @@ async def put_paramset(
channel_address: str,
paramset_key: str,
values: Any,
wait_for_callback: bool = False,
wait_for_callback: int | None = WAIT_FOR_CALLBACK,
rx_mode: str | None = None,
) -> set[ENTITY_KEY]:
"""
Expand Down
Loading

0 comments on commit dffebbe

Please sign in to comment.