Skip to content

Commit

Permalink
Run periodic tasks with an individual interval (#1859)
Browse files Browse the repository at this point in the history
* Run periodic tasks with an individual interval

* Update const.py

* Start tasks only when poll clients exist

* Update changelog.md

* Update const.py
  • Loading branch information
SukramJ authored Nov 19, 2024
1 parent 183e6c0 commit 922eeb7
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 39 deletions.
3 changes: 2 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Version 2024.11.4 (2024-11-19)

- Run periodic tasks with an individual interval
- Store tmp value for polling client data points

# Version 2024.11.3 (2024-11-18)
Expand All @@ -9,7 +10,7 @@
- Add periodic data refresh to CentralUnitChecker for some interfaces
- Add root path for virtual devices
- Maintain data_cache by interface
- Reduce MAX_CACHE_AGE to 15s
- Reduce MAX_CACHE_AGE to 10s

# Version 2024.11.2 (2024-11-17)

Expand Down
33 changes: 19 additions & 14 deletions hahomematic/caches/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def __init__(self, central: hmcu.CentralUnit) -> None:
async def load(self, direct_call: bool = False) -> None:
"""Fetch names from backend."""
if direct_call is False and changed_within_seconds(
last_change=self._refreshed_at, max_age=int(MAX_CACHE_AGE / 2)
last_change=self._refreshed_at, max_age=int(MAX_CACHE_AGE / 3)
):
return
self.clear()
Expand Down Expand Up @@ -238,15 +238,6 @@ def __init__(self, central: hmcu.CentralUnit) -> None:
self._value_cache: Final[dict[Interface, dict[str, Any]]] = {}
self._refreshed_at: Final[dict[Interface, datetime]] = {}

def is_empty(self, interface: Interface) -> bool:
"""Return if cache is empty."""
if len(self._value_cache) == 0:
return True
if not changed_within_seconds(last_change=self._get_refreshed_at(interface=interface)):
self.clear(interface=interface)
return True
return False

async def load(self, direct_call: bool = False, interface: Interface | None = None) -> None:
"""Fetch data from backend."""
_LOGGER.debug("load: Loading device data for %s", self._central.name)
Expand All @@ -255,19 +246,24 @@ async def load(self, direct_call: bool = False, interface: Interface | None = No
continue
if direct_call is False and changed_within_seconds(
last_change=self._get_refreshed_at(interface=client.interface),
max_age=int(MAX_CACHE_AGE / 2),
max_age=int(MAX_CACHE_AGE / 3),
):
return
await client.fetch_all_device_data()

async def refresh_data_point_data(
self, paramset_key: ParamsetKey | None = None, interface: Interface | None = None
self,
paramset_key: ParamsetKey | None = None,
interface: Interface | None = None,
direct_call: bool = False,
) -> None:
"""Refresh data_point data."""
for data_point in self._central.get_readable_generic_data_points(
paramset_key=paramset_key, interface=interface
):
await data_point.load_data_point_value(call_source=CallSource.HM_INIT)
await data_point.load_data_point_value(
call_source=CallSource.HM_INIT, direct_call=direct_call
)

def add_data(self, interface: Interface, all_device_data: dict[str, Any]) -> None:
"""Add data to cache."""
Expand All @@ -281,7 +277,7 @@ def get_data(
parameter: str,
) -> Any:
"""Get data from cache."""
if not self.is_empty(interface=interface):
if not self._is_empty(interface=interface):
key = f"{interface}.{channel_address.replace(':','%3A')}.{parameter}"
return self._value_cache[interface].get(key, NO_CACHE_ENTRY)
return NO_CACHE_ENTRY
Expand All @@ -299,6 +295,15 @@ def _get_refreshed_at(self, interface: Interface) -> datetime:
"""Return when cache has been refreshed."""
return self._refreshed_at.get(interface, INIT_DATETIME)

def _is_empty(self, interface: Interface) -> bool:
"""Return if cache is empty."""
if len(self._value_cache) == 0:
return True
if not changed_within_seconds(last_change=self._get_refreshed_at(interface=interface)):
self.clear(interface=interface)
return True
return False


class PingPongCache:
"""Cache to collect ping/pong events with ttl."""
Expand Down
62 changes: 42 additions & 20 deletions hahomematic/central/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import logging
from logging import DEBUG
import threading
from time import sleep
from typing import Any, Final, cast

from aiohttp import ClientSession
Expand All @@ -37,6 +36,7 @@
DEFAULT_INCLUDE_INTERNAL_PROGRAMS,
DEFAULT_INCLUDE_INTERNAL_SYSVARS,
DEFAULT_MAX_READ_WORKERS,
DEFAULT_PERIODIC_REFRESH_INTERVAL,
DEFAULT_PROGRAM_SCAN_ENABLED,
DEFAULT_SYSVAR_SCAN_ENABLED,
DEFAULT_TLS,
Expand Down Expand Up @@ -244,6 +244,13 @@ def parameter_visibility(self) -> ParameterVisibilityCache:
"""Return parameter_visibility cache."""
return self._parameter_visibility

@property
def poll_clients(self) -> tuple[hmcl.Client, ...]:
"""Return clients that need to poll data."""
return tuple(
client for client in self._clients.values() if not client.supports_push_updates
)

@property
def primary_client(self) -> hmcl.Client | None:
"""Return the primary client of the backend."""
Expand Down Expand Up @@ -651,7 +658,7 @@ async def _identify_ip_addr(self, port: int | None) -> str:
_LOGGER.warning(
"GET_IP_ADDR: Waiting for %i s,", config.CONNECTION_CHECKER_INTERVAL
)
await asyncio.sleep(config.CONNECTION_CHECKER_INTERVAL)
await asyncio.sleep(config.TIMEOUT / 10)
return ip_addr

def _start_connection_checker(self) -> None:
Expand Down Expand Up @@ -1167,13 +1174,16 @@ async def fetch_program_data(self, scheduled: bool) -> None:

@measure_execution_time
async def load_and_refresh_data_point_data(
self, interface: Interface, paramset_key: ParamsetKey | None = None
self,
interface: Interface,
paramset_key: ParamsetKey | None = None,
direct_call: bool = False,
) -> None:
"""Refresh data_point data."""
if paramset_key != ParamsetKey.MASTER and self._data_cache.is_empty(interface=interface):
if paramset_key != ParamsetKey.MASTER:
await self._data_cache.load(interface=interface)
await self._data_cache.refresh_data_point_data(
paramset_key=paramset_key, interface=interface
paramset_key=paramset_key, interface=interface, direct_call=direct_call
)

async def get_system_variable(self, name: str) -> Any | None:
Expand Down Expand Up @@ -1465,20 +1475,33 @@ def run(self) -> None:
"run: Init connection checker to server %s",
self._central.name,
)
while self._active:
self._central.looper.run_coroutine(self._check_connection(), name="check_connection")
self._central.looper.run_coroutine(
self._refresh_client_data(), name="refresh_client_data"
self._central.looper.create_task(self._run_check_connection(), name="check_connection")
if (poll_clients := self._central.poll_clients) is not None:
self._central.looper.create_task(
self._run_refresh_client_data(poll_clients=poll_clients),
name="refresh_client_data",
)
if self._active:
sleep(config.CONNECTION_CHECKER_INTERVAL)

def stop(self) -> None:
"""To stop the ConnectionChecker."""
self._active = False

async def _check_connection(self) -> None:
async def _run_check_connection(self) -> None:
"""Periodically check connection to backend."""
while self._active:
await self._check_connection()
if self._active:
await asyncio.sleep(config.CONNECTION_CHECKER_INTERVAL)

async def _run_refresh_client_data(self, poll_clients: tuple[hmcl.Client, ...]) -> None:
"""Periodically refresh client data."""
while self._active:
await self._refresh_client_data(poll_clients=poll_clients)
if self._active:
await asyncio.sleep(self._central.config.periodic_refresh_interval)

async def _check_connection(self) -> None:
"""Check connection to backend."""
_LOGGER.debug(
"CHECK_CONNECTION: Checking connection to server %s",
self._central.name,
Expand Down Expand Up @@ -1524,21 +1547,18 @@ async def _check_connection(self) -> None:
reduce_args(args=ex.args),
)

async def _refresh_client_data(self) -> None:
"""Periodically check connection to backend."""
async def _refresh_client_data(self, poll_clients: tuple[hmcl.Client, ...]) -> None:
"""Refresh client data."""
_LOGGER.debug(
"REFRESH_CLIENT_DATA: Checking connection to server %s",
self._central.name,
)
try:
if not self._central.available:
return
for client in self._central.clients:
if not client.supports_push_updates:
await self._central.load_and_refresh_data_point_data(
interface=client.interface
)
self._central.set_last_event_dt(interface_id=client.interface_id)
for client in poll_clients:
await self._central.load_and_refresh_data_point_data(interface=client.interface)
self._central.set_last_event_dt(interface_id=client.interface_id)

except NoConnectionException as nex:
_LOGGER.error(
Expand Down Expand Up @@ -1577,6 +1597,7 @@ def __init__(
listen_ip_addr: str | None = None,
listen_port: int | None = None,
max_read_workers: int = DEFAULT_MAX_READ_WORKERS,
periodic_refresh_interval: int = DEFAULT_PERIODIC_REFRESH_INTERVAL,
program_scan_enabled: bool = DEFAULT_PROGRAM_SCAN_ENABLED,
start_direct: bool = False,
sysvar_scan_enabled: bool = DEFAULT_SYSVAR_SCAN_ENABLED,
Expand All @@ -1603,6 +1624,7 @@ def __init__(
self.max_read_workers = max_read_workers
self.name: Final = name
self.password: Final = password
self.periodic_refresh_interval = periodic_refresh_interval
self.program_scan_enabled: Final = program_scan_enabled
self.start_direct: Final = start_direct
self.storage_folder: Final = storage_folder
Expand Down
6 changes: 4 additions & 2 deletions hahomematic/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
DEFAULT_LAST_COMMAND_SEND_STORE_TIMEOUT: Final = 60
DEFAULT_MAX_READ_WORKERS: Final = 1
DEFAULT_MAX_WORKERS: Final = 1
DEFAULT_PERIODIC_REFRESH_INTERVAL: Final = 15
DEFAULT_PING_PONG_MISMATCH_COUNT: Final = 15
DEFAULT_PING_PONG_MISMATCH_COUNT_TTL: Final = 300
DEFAULT_PROGRAM_SCAN_ENABLED: Final = True
Expand All @@ -29,7 +30,9 @@
DEFAULT_TLS: Final = False
DEFAULT_VERIFY_TLS: Final = False
DEFAULT_WAIT_FOR_CALLBACK: Final[int | None] = None
MAX_WAIT_FOR_CALLBACK: Final = 600

MAX_WAIT_FOR_CALLBACK: Final = 60
MAX_CACHE_AGE: Final = 10

REGA_SCRIPT_FETCH_ALL_DEVICE_DATA: Final = "fetch_all_device_data.fn"
REGA_SCRIPT_GET_SERIAL: Final = "get_serial.fn"
Expand Down Expand Up @@ -90,7 +93,6 @@
VIRTDEV_SET_PATH_ROOT: Final = "virtdev/set"
VIRTDEV_STATE_PATH_ROOT: Final = "virtdev/status"

MAX_CACHE_AGE: Final = 10

NO_CACHE_ENTRY: Final = "NO_CACHE_ENTRY"

Expand Down
5 changes: 3 additions & 2 deletions hahomematic/model/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,11 +1043,12 @@ async def get_value(
)
except BaseHomematicException as ex:
_LOGGER.debug(
"GET_OR_LOAD_VALUE: Failed to get data for %s, %s, %s: %s",
"GET_OR_LOAD_VALUE: Failed to get data for %s, %s, %s, %s: %s",
self._device.model,
channel_address,
parameter,
ex,
call_source,
reduce_args(args=ex.args),
)
for d_parameter, d_value in value_dict.items():
self._add_entry_to_device_cache(
Expand Down

0 comments on commit 922eeb7

Please sign in to comment.