Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store tmp value for polling client data points #1856

Merged
merged 4 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Version 2024.11.4 (2024-11-19)

- Store tmp value for polling client data points

# Version 2024.11.3 (2024-11-18)

- Add interface(id) to performance log message
Expand Down
15 changes: 8 additions & 7 deletions hahomematic/caches/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from hahomematic.const import (
DP_KEY,
DP_KEY_VALUE,
INIT_DATETIME,
MAX_CACHE_AGE,
NO_CACHE_ENTRY,
Expand Down Expand Up @@ -46,7 +47,7 @@ def add_set_value(
channel_address: str,
parameter: str,
value: Any,
) -> set[DP_KEY]:
) -> set[DP_KEY_VALUE]:
"""Add data from set value command."""
if parameter in CONVERTABLE_PARAMETERS:
return self.add_combined_parameter(
Expand All @@ -60,13 +61,13 @@ def add_set_value(
parameter=parameter,
)
self._last_send_command[data_point_key] = (value, datetime.now())
return {data_point_key}
return {(data_point_key, value)}

def add_put_paramset(
self, channel_address: str, paramset_key: ParamsetKey, values: dict[str, Any]
) -> set[DP_KEY]:
) -> set[DP_KEY_VALUE]:
"""Add data from put paramset command."""
data_point_keys: set[DP_KEY] = set()
data_point_key_values: set[DP_KEY_VALUE] = set()
for parameter, value in values.items():
data_point_key = get_data_point_key(
interface_id=self._interface_id,
Expand All @@ -75,12 +76,12 @@ def add_put_paramset(
parameter=parameter,
)
self._last_send_command[data_point_key] = (value, datetime.now())
data_point_keys.add(data_point_key)
return data_point_keys
data_point_key_values.add((data_point_key, value))
return data_point_key_values

def add_combined_parameter(
self, parameter: str, channel_address: str, combined_parameter: str
) -> set[DP_KEY]:
) -> set[DP_KEY_VALUE]:
"""Add data from combined parameter."""
if values := convert_combined_parameter_to_paramset(
parameter=parameter, cpv=combined_parameter
Expand Down
50 changes: 32 additions & 18 deletions hahomematic/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
DATETIME_FORMAT_MILLIS,
DEFAULT_CUSTOM_ID,
DEFAULT_MAX_WORKERS,
DP_KEY,
DP_KEY_VALUE,
DUMMY_SERIAL,
INIT_DATETIME,
INTERFACES_SUPPORTING_FIRMWARE_UPDATES,
Expand Down Expand Up @@ -565,7 +565,7 @@ async def _set_value(
wait_for_callback: int | None,
rx_mode: CommandRxMode | None = None,
check_against_pd: bool = False,
) -> set[DP_KEY]:
) -> set[DP_KEY_VALUE]:
"""Set single value on paramset VALUES."""
try:
checked_value = (
Expand Down Expand Up @@ -594,21 +594,22 @@ async def _set_value(
channel_address=channel_address, parameter=parameter, value=value
)
# store the send value in the last_value_send_cache
data_point_keys = self._last_value_send_cache.add_set_value(
data_point_key_values = self._last_value_send_cache.add_set_value(
channel_address=channel_address, parameter=parameter, value=checked_value
)
self._write_tmp_value(data_point_key_values=data_point_key_values)

if wait_for_callback is not None and (
device := self.central.get_device(
address=get_device_address(address=channel_address)
)
):
await _wait_for_state_change_or_timeout(
device=device,
data_point_keys=data_point_keys,
values={parameter: checked_value},
data_point_key_values=data_point_key_values,
wait_for_callback=wait_for_callback,
)
return data_point_keys # noqa: TRY300
return data_point_key_values # noqa: TRY300
except BaseHomematicException as ex:
raise ClientException(
f"SET_VALUE failed for {channel_address}/{parameter}/{value}: {reduce_args(args=ex.args)}"
Expand Down Expand Up @@ -639,6 +640,19 @@ def _check_set_value(
operation=Operations.WRITE,
)

def _write_tmp_value(self, data_point_key_values: set[DP_KEY_VALUE]) -> None:
"""Write data point temp value."""
if self.supports_push_updates:
return

for data_point_key_value in data_point_key_values:
data_point_key, value = data_point_key_value
interface_id, channel_address, paramset_key, parameter = data_point_key
if data_point := self.central.get_generic_data_point(
channel_address=channel_address, parameter=parameter, paramset_key=paramset_key
):
data_point.write_temp_value(value=value)

async def set_value(
self,
channel_address: str,
Expand All @@ -648,7 +662,7 @@ async def set_value(
wait_for_callback: int | None = WAIT_FOR_CALLBACK,
rx_mode: CommandRxMode | None = None,
check_against_pd: bool = False,
) -> set[DP_KEY]:
) -> set[DP_KEY_VALUE]:
"""Set single value on paramset VALUES."""
if paramset_key == ParamsetKey.VALUES:
return await self._set_value( # type: ignore[no-any-return]
Expand Down Expand Up @@ -704,7 +718,7 @@ async def put_paramset(
wait_for_callback: int | None = WAIT_FOR_CALLBACK,
rx_mode: CommandRxMode | None = None,
check_against_pd: bool = False,
) -> set[DP_KEY]:
) -> set[DP_KEY_VALUE]:
"""
Set paramsets manually.

Expand Down Expand Up @@ -759,23 +773,24 @@ async def put_paramset(
return set()

# store the send value in the last_value_send_cache
data_point_keys = self._last_value_send_cache.add_put_paramset(
data_point_key_values = self._last_value_send_cache.add_put_paramset(
channel_address=channel_address,
paramset_key=ParamsetKey(paramset_key),
values=checked_values,
)
self._write_tmp_value(data_point_key_values=data_point_key_values)

if wait_for_callback is not None and (
device := self.central.get_device(
address=get_device_address(address=channel_address)
)
):
await _wait_for_state_change_or_timeout(
device=device,
data_point_keys=data_point_keys,
values=checked_values,
data_point_key_values=data_point_key_values,
wait_for_callback=wait_for_callback,
)
return data_point_keys # noqa: TRY300
return data_point_key_values # noqa: TRY300
except BaseHomematicException as ex:
raise ClientException(
f"PUT_PARAMSET failed for {channel_address}/{paramset_key}/{values}: {reduce_args(args=ex.args)}"
Expand Down Expand Up @@ -1694,29 +1709,28 @@ def get_client(interface_id: str) -> Client | None:
@measure_execution_time
async def _wait_for_state_change_or_timeout(
device: Device,
data_point_keys: set[DP_KEY],
values: dict[str, Any],
data_point_key_values: set[DP_KEY_VALUE],
wait_for_callback: int,
) -> None:
"""Wait for a data_point to change state."""
waits = [
_track_single_data_point_state_change_or_timeout(
device=device,
data_point_key=data_point_key,
value=values.get(data_point_key[1]),
data_point_key_value=data_point_key_value,
wait_for_callback=wait_for_callback,
)
for data_point_key in data_point_keys
for data_point_key_value in data_point_key_values
]
await asyncio.gather(*waits)


@measure_execution_time
async def _track_single_data_point_state_change_or_timeout(
device: Device, data_point_key: DP_KEY, value: Any, wait_for_callback: int
device: Device, data_point_key_value: DP_KEY_VALUE, wait_for_callback: int
) -> None:
"""Wait for a data_point to change state."""
ev = asyncio.Event()
data_point_key, value = data_point_key_value

def _async_event_changed(*args: Any, **kwargs: Any) -> None:
if dp:
Expand Down
3 changes: 2 additions & 1 deletion hahomematic/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import re
from typing import Any, Final, Required, TypedDict

VERSION: Final = "2024.11.3"
VERSION: Final = "2024.11.4"

DEFAULT_CONNECTION_CHECKER_INTERVAL: Final = 15 # check if connection is available via rpc ping
DEFAULT_CUSTOM_ID: Final = "custom_id"
Expand Down Expand Up @@ -480,6 +480,7 @@ class ParameterType(StrEnum):

# interface_id, channel_address, paramset_key,parameter
DP_KEY = tuple[str, str, ParamsetKey, str]
DP_KEY_VALUE = tuple[DP_KEY, Any]

HMIP_FIRMWARE_UPDATE_IN_PROGRESS_STATES: Final[tuple[DeviceFirmwareState, ...]] = (
DeviceFirmwareState.DO_UPDATE_PENDING,
Expand Down
72 changes: 57 additions & 15 deletions hahomematic/model/data_point.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
CALLBACK_TYPE,
DEFAULT_CUSTOM_ID,
DP_KEY,
DP_KEY_VALUE,
INIT_DATETIME,
KEY_CHANNEL_OPERATION_MODE_VISIBILITY,
KWARGS_ARG_DATA_POINT,
Expand Down Expand Up @@ -124,6 +125,8 @@ def __init__(self, central: hmcu.CentralUnit, unique_id: str) -> None:
self._path_data = self._get_path_data()
self._modified_at: datetime = INIT_DATETIME
self._refreshed_at: datetime = INIT_DATETIME
self._tmp_modified_at: datetime = INIT_DATETIME
self._tmp_refreshed_at: datetime = INIT_DATETIME
self._service_methods: dict[str, Callable] = {}

@state_property
Expand Down Expand Up @@ -159,11 +162,15 @@ def full_name(self) -> str:
@state_property
def modified_at(self) -> datetime:
"""Return the last update datetime value."""
if self._tmp_modified_at > self._modified_at:
return self._tmp_modified_at
return self._modified_at

@state_property
def refreshed_at(self) -> datetime:
"""Return the last refresh datetime value."""
if self._tmp_refreshed_at > self._refreshed_at:
return self._tmp_refreshed_at
return self._refreshed_at

@config_property
Expand Down Expand Up @@ -282,14 +289,23 @@ def fire_device_removed_callback(self, *args: Any) -> None:
_LOGGER.warning("FIRE_DEVICE_REMOVED_EVENT failed: %s", reduce_args(args=ex.args))

def _set_modified_at(self, now: datetime = datetime.now()) -> None:
"""Set last_update to current datetime."""
"""Set modified_at to current datetime."""
self._modified_at = now
self._set_refreshed_at(now=now)

def _set_refreshed_at(self, now: datetime = datetime.now()) -> None:
"""Set last_update to current datetime."""
"""Set refreshed_at to current datetime."""
self._refreshed_at = now

def _set_tmp_modified_at(self, now: datetime = datetime.now()) -> None:
"""Set tmp_modified_at to current datetime."""
self._tmp_modified_at = now
self._set_tmp_refreshed_at(now=now)

def _set_tmp_refreshed_at(self, now: datetime = datetime.now()) -> None:
"""Set tmp_refreshed_at to current datetime."""
self._tmp_refreshed_at = now

def __str__(self) -> str:
"""Provide some useful information."""
return f"path: {self.state_path}, name: {self.full_name}"
Expand Down Expand Up @@ -431,8 +447,8 @@ def __init__(
)
self._value: ParameterT = None # type: ignore[assignment]
self._old_value: ParameterT = None # type: ignore[assignment]
self._modified_at: datetime = INIT_DATETIME
self._refreshed_at: datetime = INIT_DATETIME
self._tmp_value: ParameterT = None # type: ignore[assignment]

self._state_uncertain: bool = True
self._is_forced_sensor: bool = False
self._assign_parameter_data(parameter_data=parameter_data)
Expand Down Expand Up @@ -574,6 +590,8 @@ def state_uncertain(self) -> bool:
@state_property
def value(self) -> ParameterT:
"""Return the value of the data_point."""
if self._tmp_refreshed_at > self._refreshed_at:
return self._tmp_value
return self._value

@property
Expand Down Expand Up @@ -698,6 +716,9 @@ async def load_data_point_value(

def write_value(self, value: Any) -> tuple[ParameterT, ParameterT]:
"""Update value of the data_point."""

self._reset_tmp_value()

old_value = self._value
if value == NO_CACHE_ENTRY:
if self.refreshed_at != INIT_DATETIME:
Expand All @@ -716,6 +737,17 @@ def write_value(self, value: Any) -> tuple[ParameterT, ParameterT]:
self.fire_data_point_updated_callback()
return (old_value, new_value)

def write_temp_value(self, value: Any) -> None:
"""Update the temporary value of the data_point."""
temp_value = self._convert_value(value)
if self._value == temp_value:
self._set_refreshed_at()
else:
self._set_tmp_modified_at()
self._tmp_value = temp_value
self._state_uncertain = True
self.fire_data_point_updated_callback()

def update_parameter_data(self) -> None:
"""Update parameter data."""
if parameter_data := self._central.paramset_descriptions.get_parameter_data(
Expand Down Expand Up @@ -755,6 +787,11 @@ def _convert_value(self, value: Any) -> ParameterT:
)
return None # type: ignore[return-value]

def _reset_tmp_value(self) -> None:
"""Reset the temp storage."""
self._tmp_value = None # type: ignore[assignment]
self._set_modified_at(now=INIT_DATETIME)

def get_event_data(self, value: Any = None) -> dict[EventKey, Any]:
"""Get the event_data."""
event_data = {
Expand Down Expand Up @@ -801,28 +838,33 @@ def add_data_point(
data_point.parameter
] = value

async def send_data(self, wait_for_callback: int | None) -> bool:
async def send_data(self, wait_for_callback: int | None) -> set[DP_KEY_VALUE]:
"""Send data to backend."""
data_point_key_values: set[DP_KEY_VALUE] = set()
for paramset_key, paramsets in self._paramsets.items():
for paramset_no in dict(sorted(paramsets.items())).values():
for channel_address, paramset in paramset_no.items():
if len(paramset.values()) == 1:
for parameter, value in paramset.items():
await self._client.set_value(
data_point_key_values.update(
await self._client.set_value(
channel_address=channel_address,
paramset_key=paramset_key,
parameter=parameter,
value=value,
wait_for_callback=wait_for_callback,
)
)
else:
data_point_key_values.update(
await self._client.put_paramset(
channel_address=channel_address,
paramset_key=paramset_key,
parameter=parameter,
value=value,
values=paramset,
wait_for_callback=wait_for_callback,
)
else:
await self._client.put_paramset(
channel_address=channel_address,
paramset_key=paramset_key,
values=paramset,
wait_for_callback=wait_for_callback,
)
return True
return data_point_key_values


def bind_collector(
Expand Down
Loading