diff --git a/changelog.md b/changelog.md index 5050f468..77c7d774 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,6 @@ # Version 2024.11.4 (2024-11-19) +- Run periodic tasks with an individual interval - Store tmp value for polling client data points # Version 2024.11.3 (2024-11-18) @@ -9,7 +10,7 @@ - Add periodic data refresh to CentralUnitChecker for some interfaces - Add root path for virtual devices - Maintain data_cache by interface -- Reduce MAX_CACHE_AGE to 15s +- Reduce MAX_CACHE_AGE to 10s # Version 2024.11.2 (2024-11-17) diff --git a/hahomematic/caches/dynamic.py b/hahomematic/caches/dynamic.py index 55c1c1b7..55415287 100644 --- a/hahomematic/caches/dynamic.py +++ b/hahomematic/caches/dynamic.py @@ -138,7 +138,7 @@ def __init__(self, central: hmcu.CentralUnit) -> None: async def load(self, direct_call: bool = False) -> None: """Fetch names from backend.""" if direct_call is False and changed_within_seconds( - last_change=self._refreshed_at, max_age=int(MAX_CACHE_AGE / 2) + last_change=self._refreshed_at, max_age=int(MAX_CACHE_AGE / 3) ): return self.clear() @@ -238,15 +238,6 @@ def __init__(self, central: hmcu.CentralUnit) -> None: self._value_cache: Final[dict[Interface, dict[str, Any]]] = {} self._refreshed_at: Final[dict[Interface, datetime]] = {} - def is_empty(self, interface: Interface) -> bool: - """Return if cache is empty.""" - if len(self._value_cache) == 0: - return True - if not changed_within_seconds(last_change=self._get_refreshed_at(interface=interface)): - self.clear(interface=interface) - return True - return False - async def load(self, direct_call: bool = False, interface: Interface | None = None) -> None: """Fetch data from backend.""" _LOGGER.debug("load: Loading device data for %s", self._central.name) @@ -255,19 +246,24 @@ async def load(self, direct_call: bool = False, interface: Interface | None = No continue if direct_call is False and changed_within_seconds( last_change=self._get_refreshed_at(interface=client.interface), - max_age=int(MAX_CACHE_AGE / 2), + max_age=int(MAX_CACHE_AGE / 3), ): return await client.fetch_all_device_data() async def refresh_data_point_data( - self, paramset_key: ParamsetKey | None = None, interface: Interface | None = None + self, + paramset_key: ParamsetKey | None = None, + interface: Interface | None = None, + direct_call: bool = False, ) -> None: """Refresh data_point data.""" for data_point in self._central.get_readable_generic_data_points( paramset_key=paramset_key, interface=interface ): - await data_point.load_data_point_value(call_source=CallSource.HM_INIT) + await data_point.load_data_point_value( + call_source=CallSource.HM_INIT, direct_call=direct_call + ) def add_data(self, interface: Interface, all_device_data: dict[str, Any]) -> None: """Add data to cache.""" @@ -281,7 +277,7 @@ def get_data( parameter: str, ) -> Any: """Get data from cache.""" - if not self.is_empty(interface=interface): + if not self._is_empty(interface=interface): key = f"{interface}.{channel_address.replace(':','%3A')}.{parameter}" return self._value_cache[interface].get(key, NO_CACHE_ENTRY) return NO_CACHE_ENTRY @@ -299,6 +295,15 @@ def _get_refreshed_at(self, interface: Interface) -> datetime: """Return when cache has been refreshed.""" return self._refreshed_at.get(interface, INIT_DATETIME) + def _is_empty(self, interface: Interface) -> bool: + """Return if cache is empty.""" + if len(self._value_cache) == 0: + return True + if not changed_within_seconds(last_change=self._get_refreshed_at(interface=interface)): + self.clear(interface=interface) + return True + return False + class PingPongCache: """Cache to collect ping/pong events with ttl.""" diff --git a/hahomematic/central/__init__.py b/hahomematic/central/__init__.py index 4dab1738..80bcf80b 100644 --- a/hahomematic/central/__init__.py +++ b/hahomematic/central/__init__.py @@ -13,7 +13,6 @@ import logging from logging import DEBUG import threading -from time import sleep from typing import Any, Final, cast from aiohttp import ClientSession @@ -37,6 +36,7 @@ DEFAULT_INCLUDE_INTERNAL_PROGRAMS, DEFAULT_INCLUDE_INTERNAL_SYSVARS, DEFAULT_MAX_READ_WORKERS, + DEFAULT_PERIODIC_REFRESH_INTERVAL, DEFAULT_PROGRAM_SCAN_ENABLED, DEFAULT_SYSVAR_SCAN_ENABLED, DEFAULT_TLS, @@ -244,6 +244,13 @@ def parameter_visibility(self) -> ParameterVisibilityCache: """Return parameter_visibility cache.""" return self._parameter_visibility + @property + def poll_clients(self) -> tuple[hmcl.Client, ...]: + """Return clients that need to poll data.""" + return tuple( + client for client in self._clients.values() if not client.supports_push_updates + ) + @property def primary_client(self) -> hmcl.Client | None: """Return the primary client of the backend.""" @@ -651,7 +658,7 @@ async def _identify_ip_addr(self, port: int | None) -> str: _LOGGER.warning( "GET_IP_ADDR: Waiting for %i s,", config.CONNECTION_CHECKER_INTERVAL ) - await asyncio.sleep(config.CONNECTION_CHECKER_INTERVAL) + await asyncio.sleep(config.TIMEOUT / 10) return ip_addr def _start_connection_checker(self) -> None: @@ -1167,13 +1174,16 @@ async def fetch_program_data(self, scheduled: bool) -> None: @measure_execution_time async def load_and_refresh_data_point_data( - self, interface: Interface, paramset_key: ParamsetKey | None = None + self, + interface: Interface, + paramset_key: ParamsetKey | None = None, + direct_call: bool = False, ) -> None: """Refresh data_point data.""" - if paramset_key != ParamsetKey.MASTER and self._data_cache.is_empty(interface=interface): + if paramset_key != ParamsetKey.MASTER: await self._data_cache.load(interface=interface) await self._data_cache.refresh_data_point_data( - paramset_key=paramset_key, interface=interface + paramset_key=paramset_key, interface=interface, direct_call=direct_call ) async def get_system_variable(self, name: str) -> Any | None: @@ -1465,20 +1475,33 @@ def run(self) -> None: "run: Init connection checker to server %s", self._central.name, ) - while self._active: - self._central.looper.run_coroutine(self._check_connection(), name="check_connection") - self._central.looper.run_coroutine( - self._refresh_client_data(), name="refresh_client_data" + self._central.looper.create_task(self._run_check_connection(), name="check_connection") + if (poll_clients := self._central.poll_clients) is not None: + self._central.looper.create_task( + self._run_refresh_client_data(poll_clients=poll_clients), + name="refresh_client_data", ) - if self._active: - sleep(config.CONNECTION_CHECKER_INTERVAL) def stop(self) -> None: """To stop the ConnectionChecker.""" self._active = False - async def _check_connection(self) -> None: + async def _run_check_connection(self) -> None: """Periodically check connection to backend.""" + while self._active: + await self._check_connection() + if self._active: + await asyncio.sleep(config.CONNECTION_CHECKER_INTERVAL) + + async def _run_refresh_client_data(self, poll_clients: tuple[hmcl.Client, ...]) -> None: + """Periodically refresh client data.""" + while self._active: + await self._refresh_client_data(poll_clients=poll_clients) + if self._active: + await asyncio.sleep(self._central.config.periodic_refresh_interval) + + async def _check_connection(self) -> None: + """Check connection to backend.""" _LOGGER.debug( "CHECK_CONNECTION: Checking connection to server %s", self._central.name, @@ -1524,8 +1547,8 @@ async def _check_connection(self) -> None: reduce_args(args=ex.args), ) - async def _refresh_client_data(self) -> None: - """Periodically check connection to backend.""" + async def _refresh_client_data(self, poll_clients: tuple[hmcl.Client, ...]) -> None: + """Refresh client data.""" _LOGGER.debug( "REFRESH_CLIENT_DATA: Checking connection to server %s", self._central.name, @@ -1533,12 +1556,9 @@ async def _refresh_client_data(self) -> None: try: if not self._central.available: return - for client in self._central.clients: - if not client.supports_push_updates: - await self._central.load_and_refresh_data_point_data( - interface=client.interface - ) - self._central.set_last_event_dt(interface_id=client.interface_id) + for client in poll_clients: + await self._central.load_and_refresh_data_point_data(interface=client.interface) + self._central.set_last_event_dt(interface_id=client.interface_id) except NoConnectionException as nex: _LOGGER.error( @@ -1577,6 +1597,7 @@ def __init__( listen_ip_addr: str | None = None, listen_port: int | None = None, max_read_workers: int = DEFAULT_MAX_READ_WORKERS, + periodic_refresh_interval: int = DEFAULT_PERIODIC_REFRESH_INTERVAL, program_scan_enabled: bool = DEFAULT_PROGRAM_SCAN_ENABLED, start_direct: bool = False, sysvar_scan_enabled: bool = DEFAULT_SYSVAR_SCAN_ENABLED, @@ -1603,6 +1624,7 @@ def __init__( self.max_read_workers = max_read_workers self.name: Final = name self.password: Final = password + self.periodic_refresh_interval = periodic_refresh_interval self.program_scan_enabled: Final = program_scan_enabled self.start_direct: Final = start_direct self.storage_folder: Final = storage_folder diff --git a/hahomematic/const.py b/hahomematic/const.py index d5dca55c..851548cd 100644 --- a/hahomematic/const.py +++ b/hahomematic/const.py @@ -20,6 +20,7 @@ DEFAULT_LAST_COMMAND_SEND_STORE_TIMEOUT: Final = 60 DEFAULT_MAX_READ_WORKERS: Final = 1 DEFAULT_MAX_WORKERS: Final = 1 +DEFAULT_PERIODIC_REFRESH_INTERVAL: Final = 15 DEFAULT_PING_PONG_MISMATCH_COUNT: Final = 15 DEFAULT_PING_PONG_MISMATCH_COUNT_TTL: Final = 300 DEFAULT_PROGRAM_SCAN_ENABLED: Final = True @@ -29,7 +30,9 @@ DEFAULT_TLS: Final = False DEFAULT_VERIFY_TLS: Final = False DEFAULT_WAIT_FOR_CALLBACK: Final[int | None] = None -MAX_WAIT_FOR_CALLBACK: Final = 600 + +MAX_WAIT_FOR_CALLBACK: Final = 60 +MAX_CACHE_AGE: Final = 10 REGA_SCRIPT_FETCH_ALL_DEVICE_DATA: Final = "fetch_all_device_data.fn" REGA_SCRIPT_GET_SERIAL: Final = "get_serial.fn" @@ -90,7 +93,6 @@ VIRTDEV_SET_PATH_ROOT: Final = "virtdev/set" VIRTDEV_STATE_PATH_ROOT: Final = "virtdev/status" -MAX_CACHE_AGE: Final = 10 NO_CACHE_ENTRY: Final = "NO_CACHE_ENTRY" diff --git a/hahomematic/model/device.py b/hahomematic/model/device.py index 3f68d919..7b1a1a96 100644 --- a/hahomematic/model/device.py +++ b/hahomematic/model/device.py @@ -1043,11 +1043,12 @@ async def get_value( ) except BaseHomematicException as ex: _LOGGER.debug( - "GET_OR_LOAD_VALUE: Failed to get data for %s, %s, %s: %s", + "GET_OR_LOAD_VALUE: Failed to get data for %s, %s, %s, %s: %s", self._device.model, channel_address, parameter, - ex, + call_source, + reduce_args(args=ex.args), ) for d_parameter, d_value in value_dict.items(): self._add_entry_to_device_cache(