Skip to content

Commit

Permalink
Store tmp value for polling client data points (#1856)
Browse files Browse the repository at this point in the history
* Store tmp value for polling client data points

* Name fix

* Add return value

* write tmp value if needed
  • Loading branch information
SukramJ authored Nov 19, 2024
1 parent bfabec4 commit 183e6c0
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 50 deletions.
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Version 2024.11.4 (2024-11-19)

- Store tmp value for polling client data points

# Version 2024.11.3 (2024-11-18)

- Add interface(id) to performance log message
Expand Down
15 changes: 8 additions & 7 deletions hahomematic/caches/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from hahomematic.const import (
DP_KEY,
DP_KEY_VALUE,
INIT_DATETIME,
MAX_CACHE_AGE,
NO_CACHE_ENTRY,
Expand Down Expand Up @@ -46,7 +47,7 @@ def add_set_value(
channel_address: str,
parameter: str,
value: Any,
) -> set[DP_KEY]:
) -> set[DP_KEY_VALUE]:
"""Add data from set value command."""
if parameter in CONVERTABLE_PARAMETERS:
return self.add_combined_parameter(
Expand All @@ -60,13 +61,13 @@ def add_set_value(
parameter=parameter,
)
self._last_send_command[data_point_key] = (value, datetime.now())
return {data_point_key}
return {(data_point_key, value)}

def add_put_paramset(
self, channel_address: str, paramset_key: ParamsetKey, values: dict[str, Any]
) -> set[DP_KEY]:
) -> set[DP_KEY_VALUE]:
"""Add data from put paramset command."""
data_point_keys: set[DP_KEY] = set()
data_point_key_values: set[DP_KEY_VALUE] = set()
for parameter, value in values.items():
data_point_key = get_data_point_key(
interface_id=self._interface_id,
Expand All @@ -75,12 +76,12 @@ def add_put_paramset(
parameter=parameter,
)
self._last_send_command[data_point_key] = (value, datetime.now())
data_point_keys.add(data_point_key)
return data_point_keys
data_point_key_values.add((data_point_key, value))
return data_point_key_values

def add_combined_parameter(
self, parameter: str, channel_address: str, combined_parameter: str
) -> set[DP_KEY]:
) -> set[DP_KEY_VALUE]:
"""Add data from combined parameter."""
if values := convert_combined_parameter_to_paramset(
parameter=parameter, cpv=combined_parameter
Expand Down
50 changes: 32 additions & 18 deletions hahomematic/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
DATETIME_FORMAT_MILLIS,
DEFAULT_CUSTOM_ID,
DEFAULT_MAX_WORKERS,
DP_KEY,
DP_KEY_VALUE,
DUMMY_SERIAL,
INIT_DATETIME,
INTERFACES_SUPPORTING_FIRMWARE_UPDATES,
Expand Down Expand Up @@ -565,7 +565,7 @@ async def _set_value(
wait_for_callback: int | None,
rx_mode: CommandRxMode | None = None,
check_against_pd: bool = False,
) -> set[DP_KEY]:
) -> set[DP_KEY_VALUE]:
"""Set single value on paramset VALUES."""
try:
checked_value = (
Expand Down Expand Up @@ -594,21 +594,22 @@ async def _set_value(
channel_address=channel_address, parameter=parameter, value=value
)
# store the send value in the last_value_send_cache
data_point_keys = self._last_value_send_cache.add_set_value(
data_point_key_values = self._last_value_send_cache.add_set_value(
channel_address=channel_address, parameter=parameter, value=checked_value
)
self._write_tmp_value(data_point_key_values=data_point_key_values)

if wait_for_callback is not None and (
device := self.central.get_device(
address=get_device_address(address=channel_address)
)
):
await _wait_for_state_change_or_timeout(
device=device,
data_point_keys=data_point_keys,
values={parameter: checked_value},
data_point_key_values=data_point_key_values,
wait_for_callback=wait_for_callback,
)
return data_point_keys # noqa: TRY300
return data_point_key_values # noqa: TRY300
except BaseHomematicException as ex:
raise ClientException(
f"SET_VALUE failed for {channel_address}/{parameter}/{value}: {reduce_args(args=ex.args)}"
Expand Down Expand Up @@ -639,6 +640,19 @@ def _check_set_value(
operation=Operations.WRITE,
)

def _write_tmp_value(self, data_point_key_values: set[DP_KEY_VALUE]) -> None:
"""Write data point temp value."""
if self.supports_push_updates:
return

for data_point_key_value in data_point_key_values:
data_point_key, value = data_point_key_value
interface_id, channel_address, paramset_key, parameter = data_point_key
if data_point := self.central.get_generic_data_point(
channel_address=channel_address, parameter=parameter, paramset_key=paramset_key
):
data_point.write_temp_value(value=value)

async def set_value(
self,
channel_address: str,
Expand All @@ -648,7 +662,7 @@ async def set_value(
wait_for_callback: int | None = WAIT_FOR_CALLBACK,
rx_mode: CommandRxMode | None = None,
check_against_pd: bool = False,
) -> set[DP_KEY]:
) -> set[DP_KEY_VALUE]:
"""Set single value on paramset VALUES."""
if paramset_key == ParamsetKey.VALUES:
return await self._set_value( # type: ignore[no-any-return]
Expand Down Expand Up @@ -704,7 +718,7 @@ async def put_paramset(
wait_for_callback: int | None = WAIT_FOR_CALLBACK,
rx_mode: CommandRxMode | None = None,
check_against_pd: bool = False,
) -> set[DP_KEY]:
) -> set[DP_KEY_VALUE]:
"""
Set paramsets manually.
Expand Down Expand Up @@ -759,23 +773,24 @@ async def put_paramset(
return set()

# store the send value in the last_value_send_cache
data_point_keys = self._last_value_send_cache.add_put_paramset(
data_point_key_values = self._last_value_send_cache.add_put_paramset(
channel_address=channel_address,
paramset_key=ParamsetKey(paramset_key),
values=checked_values,
)
self._write_tmp_value(data_point_key_values=data_point_key_values)

if wait_for_callback is not None and (
device := self.central.get_device(
address=get_device_address(address=channel_address)
)
):
await _wait_for_state_change_or_timeout(
device=device,
data_point_keys=data_point_keys,
values=checked_values,
data_point_key_values=data_point_key_values,
wait_for_callback=wait_for_callback,
)
return data_point_keys # noqa: TRY300
return data_point_key_values # noqa: TRY300
except BaseHomematicException as ex:
raise ClientException(
f"PUT_PARAMSET failed for {channel_address}/{paramset_key}/{values}: {reduce_args(args=ex.args)}"
Expand Down Expand Up @@ -1694,29 +1709,28 @@ def get_client(interface_id: str) -> Client | None:
@measure_execution_time
async def _wait_for_state_change_or_timeout(
device: Device,
data_point_keys: set[DP_KEY],
values: dict[str, Any],
data_point_key_values: set[DP_KEY_VALUE],
wait_for_callback: int,
) -> None:
"""Wait for a data_point to change state."""
waits = [
_track_single_data_point_state_change_or_timeout(
device=device,
data_point_key=data_point_key,
value=values.get(data_point_key[1]),
data_point_key_value=data_point_key_value,
wait_for_callback=wait_for_callback,
)
for data_point_key in data_point_keys
for data_point_key_value in data_point_key_values
]
await asyncio.gather(*waits)


@measure_execution_time
async def _track_single_data_point_state_change_or_timeout(
device: Device, data_point_key: DP_KEY, value: Any, wait_for_callback: int
device: Device, data_point_key_value: DP_KEY_VALUE, wait_for_callback: int
) -> None:
"""Wait for a data_point to change state."""
ev = asyncio.Event()
data_point_key, value = data_point_key_value

def _async_event_changed(*args: Any, **kwargs: Any) -> None:
if dp:
Expand Down
3 changes: 2 additions & 1 deletion hahomematic/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import re
from typing import Any, Final, Required, TypedDict

VERSION: Final = "2024.11.3"
VERSION: Final = "2024.11.4"

DEFAULT_CONNECTION_CHECKER_INTERVAL: Final = 15 # check if connection is available via rpc ping
DEFAULT_CUSTOM_ID: Final = "custom_id"
Expand Down Expand Up @@ -480,6 +480,7 @@ class ParameterType(StrEnum):

# interface_id, channel_address, paramset_key,parameter
DP_KEY = tuple[str, str, ParamsetKey, str]
DP_KEY_VALUE = tuple[DP_KEY, Any]

HMIP_FIRMWARE_UPDATE_IN_PROGRESS_STATES: Final[tuple[DeviceFirmwareState, ...]] = (
DeviceFirmwareState.DO_UPDATE_PENDING,
Expand Down
72 changes: 57 additions & 15 deletions hahomematic/model/data_point.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
CALLBACK_TYPE,
DEFAULT_CUSTOM_ID,
DP_KEY,
DP_KEY_VALUE,
INIT_DATETIME,
KEY_CHANNEL_OPERATION_MODE_VISIBILITY,
KWARGS_ARG_DATA_POINT,
Expand Down Expand Up @@ -124,6 +125,8 @@ def __init__(self, central: hmcu.CentralUnit, unique_id: str) -> None:
self._path_data = self._get_path_data()
self._modified_at: datetime = INIT_DATETIME
self._refreshed_at: datetime = INIT_DATETIME
self._tmp_modified_at: datetime = INIT_DATETIME
self._tmp_refreshed_at: datetime = INIT_DATETIME
self._service_methods: dict[str, Callable] = {}

@state_property
Expand Down Expand Up @@ -159,11 +162,15 @@ def full_name(self) -> str:
@state_property
def modified_at(self) -> datetime:
"""Return the last update datetime value."""
if self._tmp_modified_at > self._modified_at:
return self._tmp_modified_at
return self._modified_at

@state_property
def refreshed_at(self) -> datetime:
"""Return the last refresh datetime value."""
if self._tmp_refreshed_at > self._refreshed_at:
return self._tmp_refreshed_at
return self._refreshed_at

@config_property
Expand Down Expand Up @@ -282,14 +289,23 @@ def fire_device_removed_callback(self, *args: Any) -> None:
_LOGGER.warning("FIRE_DEVICE_REMOVED_EVENT failed: %s", reduce_args(args=ex.args))

def _set_modified_at(self, now: datetime = datetime.now()) -> None:
"""Set last_update to current datetime."""
"""Set modified_at to current datetime."""
self._modified_at = now
self._set_refreshed_at(now=now)

def _set_refreshed_at(self, now: datetime = datetime.now()) -> None:
"""Set last_update to current datetime."""
"""Set refreshed_at to current datetime."""
self._refreshed_at = now

def _set_tmp_modified_at(self, now: datetime = datetime.now()) -> None:
"""Set tmp_modified_at to current datetime."""
self._tmp_modified_at = now
self._set_tmp_refreshed_at(now=now)

def _set_tmp_refreshed_at(self, now: datetime = datetime.now()) -> None:
"""Set tmp_refreshed_at to current datetime."""
self._tmp_refreshed_at = now

def __str__(self) -> str:
"""Provide some useful information."""
return f"path: {self.state_path}, name: {self.full_name}"
Expand Down Expand Up @@ -431,8 +447,8 @@ def __init__(
)
self._value: ParameterT = None # type: ignore[assignment]
self._old_value: ParameterT = None # type: ignore[assignment]
self._modified_at: datetime = INIT_DATETIME
self._refreshed_at: datetime = INIT_DATETIME
self._tmp_value: ParameterT = None # type: ignore[assignment]

self._state_uncertain: bool = True
self._is_forced_sensor: bool = False
self._assign_parameter_data(parameter_data=parameter_data)
Expand Down Expand Up @@ -574,6 +590,8 @@ def state_uncertain(self) -> bool:
@state_property
def value(self) -> ParameterT:
"""Return the value of the data_point."""
if self._tmp_refreshed_at > self._refreshed_at:
return self._tmp_value
return self._value

@property
Expand Down Expand Up @@ -698,6 +716,9 @@ async def load_data_point_value(

def write_value(self, value: Any) -> tuple[ParameterT, ParameterT]:
"""Update value of the data_point."""

self._reset_tmp_value()

old_value = self._value
if value == NO_CACHE_ENTRY:
if self.refreshed_at != INIT_DATETIME:
Expand All @@ -716,6 +737,17 @@ def write_value(self, value: Any) -> tuple[ParameterT, ParameterT]:
self.fire_data_point_updated_callback()
return (old_value, new_value)

def write_temp_value(self, value: Any) -> None:
"""Update the temporary value of the data_point."""
temp_value = self._convert_value(value)
if self._value == temp_value:
self._set_refreshed_at()
else:
self._set_tmp_modified_at()
self._tmp_value = temp_value
self._state_uncertain = True
self.fire_data_point_updated_callback()

def update_parameter_data(self) -> None:
"""Update parameter data."""
if parameter_data := self._central.paramset_descriptions.get_parameter_data(
Expand Down Expand Up @@ -755,6 +787,11 @@ def _convert_value(self, value: Any) -> ParameterT:
)
return None # type: ignore[return-value]

def _reset_tmp_value(self) -> None:
"""Reset the temp storage."""
self._tmp_value = None # type: ignore[assignment]
self._set_modified_at(now=INIT_DATETIME)

def get_event_data(self, value: Any = None) -> dict[EventKey, Any]:
"""Get the event_data."""
event_data = {
Expand Down Expand Up @@ -801,28 +838,33 @@ def add_data_point(
data_point.parameter
] = value

async def send_data(self, wait_for_callback: int | None) -> bool:
async def send_data(self, wait_for_callback: int | None) -> set[DP_KEY_VALUE]:
"""Send data to backend."""
data_point_key_values: set[DP_KEY_VALUE] = set()
for paramset_key, paramsets in self._paramsets.items():
for paramset_no in dict(sorted(paramsets.items())).values():
for channel_address, paramset in paramset_no.items():
if len(paramset.values()) == 1:
for parameter, value in paramset.items():
await self._client.set_value(
data_point_key_values.update(
await self._client.set_value(
channel_address=channel_address,
paramset_key=paramset_key,
parameter=parameter,
value=value,
wait_for_callback=wait_for_callback,
)
)
else:
data_point_key_values.update(
await self._client.put_paramset(
channel_address=channel_address,
paramset_key=paramset_key,
parameter=parameter,
value=value,
values=paramset,
wait_for_callback=wait_for_callback,
)
else:
await self._client.put_paramset(
channel_address=channel_address,
paramset_key=paramset_key,
values=paramset,
wait_for_callback=wait_for_callback,
)
return True
return data_point_key_values


def bind_collector(
Expand Down
Loading

0 comments on commit 183e6c0

Please sign in to comment.