Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run periodic tasks with an individual interval #1859

Merged
merged 5 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Version 2024.11.4 (2024-11-19)

- Run periodic tasks with an individual interval
- Store tmp value for polling client data points

# Version 2024.11.3 (2024-11-18)
Expand All @@ -9,7 +10,7 @@
- Add periodic data refresh to CentralUnitChecker for some interfaces
- Add root path for virtual devices
- Maintain data_cache by interface
- Reduce MAX_CACHE_AGE to 15s
- Reduce MAX_CACHE_AGE to 10s

# Version 2024.11.2 (2024-11-17)

Expand Down
33 changes: 19 additions & 14 deletions hahomematic/caches/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def __init__(self, central: hmcu.CentralUnit) -> None:
async def load(self, direct_call: bool = False) -> None:
"""Fetch names from backend."""
if direct_call is False and changed_within_seconds(
last_change=self._refreshed_at, max_age=int(MAX_CACHE_AGE / 2)
last_change=self._refreshed_at, max_age=int(MAX_CACHE_AGE / 3)
):
return
self.clear()
Expand Down Expand Up @@ -238,15 +238,6 @@ def __init__(self, central: hmcu.CentralUnit) -> None:
self._value_cache: Final[dict[Interface, dict[str, Any]]] = {}
self._refreshed_at: Final[dict[Interface, datetime]] = {}

def is_empty(self, interface: Interface) -> bool:
"""Return if cache is empty."""
if len(self._value_cache) == 0:
return True
if not changed_within_seconds(last_change=self._get_refreshed_at(interface=interface)):
self.clear(interface=interface)
return True
return False

async def load(self, direct_call: bool = False, interface: Interface | None = None) -> None:
"""Fetch data from backend."""
_LOGGER.debug("load: Loading device data for %s", self._central.name)
Expand All @@ -255,19 +246,24 @@ async def load(self, direct_call: bool = False, interface: Interface | None = No
continue
if direct_call is False and changed_within_seconds(
last_change=self._get_refreshed_at(interface=client.interface),
max_age=int(MAX_CACHE_AGE / 2),
max_age=int(MAX_CACHE_AGE / 3),
):
return
await client.fetch_all_device_data()

async def refresh_data_point_data(
self, paramset_key: ParamsetKey | None = None, interface: Interface | None = None
self,
paramset_key: ParamsetKey | None = None,
interface: Interface | None = None,
direct_call: bool = False,
) -> None:
"""Refresh data_point data."""
for data_point in self._central.get_readable_generic_data_points(
paramset_key=paramset_key, interface=interface
):
await data_point.load_data_point_value(call_source=CallSource.HM_INIT)
await data_point.load_data_point_value(
call_source=CallSource.HM_INIT, direct_call=direct_call
)

def add_data(self, interface: Interface, all_device_data: dict[str, Any]) -> None:
"""Add data to cache."""
Expand All @@ -281,7 +277,7 @@ def get_data(
parameter: str,
) -> Any:
"""Get data from cache."""
if not self.is_empty(interface=interface):
if not self._is_empty(interface=interface):
key = f"{interface}.{channel_address.replace(':','%3A')}.{parameter}"
return self._value_cache[interface].get(key, NO_CACHE_ENTRY)
return NO_CACHE_ENTRY
Expand All @@ -299,6 +295,15 @@ def _get_refreshed_at(self, interface: Interface) -> datetime:
"""Return when cache has been refreshed."""
return self._refreshed_at.get(interface, INIT_DATETIME)

def _is_empty(self, interface: Interface) -> bool:
"""Return if cache is empty."""
if len(self._value_cache) == 0:
return True
if not changed_within_seconds(last_change=self._get_refreshed_at(interface=interface)):
self.clear(interface=interface)
return True
return False


class PingPongCache:
"""Cache to collect ping/pong events with ttl."""
Expand Down
62 changes: 42 additions & 20 deletions hahomematic/central/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import logging
from logging import DEBUG
import threading
from time import sleep
from typing import Any, Final, cast

from aiohttp import ClientSession
Expand All @@ -37,6 +36,7 @@
DEFAULT_INCLUDE_INTERNAL_PROGRAMS,
DEFAULT_INCLUDE_INTERNAL_SYSVARS,
DEFAULT_MAX_READ_WORKERS,
DEFAULT_PERIODIC_REFRESH_INTERVAL,
DEFAULT_PROGRAM_SCAN_ENABLED,
DEFAULT_SYSVAR_SCAN_ENABLED,
DEFAULT_TLS,
Expand Down Expand Up @@ -244,6 +244,13 @@ def parameter_visibility(self) -> ParameterVisibilityCache:
"""Return parameter_visibility cache."""
return self._parameter_visibility

@property
def poll_clients(self) -> tuple[hmcl.Client, ...]:
"""Return clients that need to poll data."""
return tuple(
client for client in self._clients.values() if not client.supports_push_updates
)

@property
def primary_client(self) -> hmcl.Client | None:
"""Return the primary client of the backend."""
Expand Down Expand Up @@ -651,7 +658,7 @@ async def _identify_ip_addr(self, port: int | None) -> str:
_LOGGER.warning(
"GET_IP_ADDR: Waiting for %i s,", config.CONNECTION_CHECKER_INTERVAL
)
await asyncio.sleep(config.CONNECTION_CHECKER_INTERVAL)
await asyncio.sleep(config.TIMEOUT / 10)
return ip_addr

def _start_connection_checker(self) -> None:
Expand Down Expand Up @@ -1167,13 +1174,16 @@ async def fetch_program_data(self, scheduled: bool) -> None:

@measure_execution_time
async def load_and_refresh_data_point_data(
self, interface: Interface, paramset_key: ParamsetKey | None = None
self,
interface: Interface,
paramset_key: ParamsetKey | None = None,
direct_call: bool = False,
) -> None:
"""Refresh data_point data."""
if paramset_key != ParamsetKey.MASTER and self._data_cache.is_empty(interface=interface):
if paramset_key != ParamsetKey.MASTER:
await self._data_cache.load(interface=interface)
await self._data_cache.refresh_data_point_data(
paramset_key=paramset_key, interface=interface
paramset_key=paramset_key, interface=interface, direct_call=direct_call
)

async def get_system_variable(self, name: str) -> Any | None:
Expand Down Expand Up @@ -1465,20 +1475,33 @@ def run(self) -> None:
"run: Init connection checker to server %s",
self._central.name,
)
while self._active:
self._central.looper.run_coroutine(self._check_connection(), name="check_connection")
self._central.looper.run_coroutine(
self._refresh_client_data(), name="refresh_client_data"
self._central.looper.create_task(self._run_check_connection(), name="check_connection")
if (poll_clients := self._central.poll_clients) is not None:
self._central.looper.create_task(
self._run_refresh_client_data(poll_clients=poll_clients),
name="refresh_client_data",
)
if self._active:
sleep(config.CONNECTION_CHECKER_INTERVAL)

def stop(self) -> None:
"""To stop the ConnectionChecker."""
self._active = False

async def _check_connection(self) -> None:
async def _run_check_connection(self) -> None:
"""Periodically check connection to backend."""
while self._active:
await self._check_connection()
if self._active:
await asyncio.sleep(config.CONNECTION_CHECKER_INTERVAL)

async def _run_refresh_client_data(self, poll_clients: tuple[hmcl.Client, ...]) -> None:
"""Periodically refresh client data."""
while self._active:
await self._refresh_client_data(poll_clients=poll_clients)
if self._active:
await asyncio.sleep(self._central.config.periodic_refresh_interval)

async def _check_connection(self) -> None:
"""Check connection to backend."""
_LOGGER.debug(
"CHECK_CONNECTION: Checking connection to server %s",
self._central.name,
Expand Down Expand Up @@ -1524,21 +1547,18 @@ async def _check_connection(self) -> None:
reduce_args(args=ex.args),
)

async def _refresh_client_data(self) -> None:
"""Periodically check connection to backend."""
async def _refresh_client_data(self, poll_clients: tuple[hmcl.Client, ...]) -> None:
"""Refresh client data."""
_LOGGER.debug(
"REFRESH_CLIENT_DATA: Checking connection to server %s",
self._central.name,
)
try:
if not self._central.available:
return
for client in self._central.clients:
if not client.supports_push_updates:
await self._central.load_and_refresh_data_point_data(
interface=client.interface
)
self._central.set_last_event_dt(interface_id=client.interface_id)
for client in poll_clients:
await self._central.load_and_refresh_data_point_data(interface=client.interface)
self._central.set_last_event_dt(interface_id=client.interface_id)

except NoConnectionException as nex:
_LOGGER.error(
Expand Down Expand Up @@ -1577,6 +1597,7 @@ def __init__(
listen_ip_addr: str | None = None,
listen_port: int | None = None,
max_read_workers: int = DEFAULT_MAX_READ_WORKERS,
periodic_refresh_interval: int = DEFAULT_PERIODIC_REFRESH_INTERVAL,
program_scan_enabled: bool = DEFAULT_PROGRAM_SCAN_ENABLED,
start_direct: bool = False,
sysvar_scan_enabled: bool = DEFAULT_SYSVAR_SCAN_ENABLED,
Expand All @@ -1603,6 +1624,7 @@ def __init__(
self.max_read_workers = max_read_workers
self.name: Final = name
self.password: Final = password
self.periodic_refresh_interval = periodic_refresh_interval
self.program_scan_enabled: Final = program_scan_enabled
self.start_direct: Final = start_direct
self.storage_folder: Final = storage_folder
Expand Down
6 changes: 4 additions & 2 deletions hahomematic/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
DEFAULT_LAST_COMMAND_SEND_STORE_TIMEOUT: Final = 60
DEFAULT_MAX_READ_WORKERS: Final = 1
DEFAULT_MAX_WORKERS: Final = 1
DEFAULT_PERIODIC_REFRESH_INTERVAL: Final = 15
DEFAULT_PING_PONG_MISMATCH_COUNT: Final = 15
DEFAULT_PING_PONG_MISMATCH_COUNT_TTL: Final = 300
DEFAULT_PROGRAM_SCAN_ENABLED: Final = True
Expand All @@ -29,7 +30,9 @@
DEFAULT_TLS: Final = False
DEFAULT_VERIFY_TLS: Final = False
DEFAULT_WAIT_FOR_CALLBACK: Final[int | None] = None
MAX_WAIT_FOR_CALLBACK: Final = 600

MAX_WAIT_FOR_CALLBACK: Final = 60
MAX_CACHE_AGE: Final = 10

REGA_SCRIPT_FETCH_ALL_DEVICE_DATA: Final = "fetch_all_device_data.fn"
REGA_SCRIPT_GET_SERIAL: Final = "get_serial.fn"
Expand Down Expand Up @@ -90,7 +93,6 @@
VIRTDEV_SET_PATH_ROOT: Final = "virtdev/set"
VIRTDEV_STATE_PATH_ROOT: Final = "virtdev/status"

MAX_CACHE_AGE: Final = 10

NO_CACHE_ENTRY: Final = "NO_CACHE_ENTRY"

Expand Down
5 changes: 3 additions & 2 deletions hahomematic/model/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,11 +1043,12 @@ async def get_value(
)
except BaseHomematicException as ex:
_LOGGER.debug(
"GET_OR_LOAD_VALUE: Failed to get data for %s, %s, %s: %s",
"GET_OR_LOAD_VALUE: Failed to get data for %s, %s, %s, %s: %s",
self._device.model,
channel_address,
parameter,
ex,
call_source,
reduce_args(args=ex.args),
)
for d_parameter, d_value in value_dict.items():
self._add_entry_to_device_cache(
Expand Down