diff --git a/changelog.md b/changelog.md index 11694cbb..97f8f95c 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,7 @@ +# Version 2024.8.12 (2024-08-24) + +- Add additional validation on config parameters + # Version 2024.8.11 (2024-08-21) - Make HEATING_COOLING visible for thermostats diff --git a/hahomematic/central/__init__.py b/hahomematic/central/__init__.py index 19f08c21..58d829db 100644 --- a/hahomematic/central/__init__.py +++ b/hahomematic/central/__init__.py @@ -1401,22 +1401,25 @@ def use_caches(self) -> bool: """Return if caches should be used.""" return self.start_direct is False - def check_config(self, extended_validation: bool = True) -> None: + def check_config(self) -> None: """Check config. Throws BaseHomematicException on failure.""" if config_failures := check_config( central_name=self.name, + host=self.host, username=self.username, password=self.password, storage_folder=self.storage_folder, - extended_validation=extended_validation, + callback_host=self.callback_host, + callback_port=self.callback_port, + json_port=self.json_port, ): failures = ", ".join(config_failures) raise HaHomematicConfigException(failures) - def create_central(self, extended_validation: bool = True) -> CentralUnit: + def create_central(self) -> CentralUnit: """Create the central. Throws BaseHomematicException on validation failure.""" try: - self.check_config(extended_validation=extended_validation) + self.check_config() return CentralUnit(self) except BaseHomematicException as bhex: _LOGGER.warning("CREATE_CENTRAL: Not able to create a central: %s", bhex) diff --git a/hahomematic/support.py b/hahomematic/support.py index 59d7d8f8..f15c9f9a 100644 --- a/hahomematic/support.py +++ b/hahomematic/support.py @@ -8,6 +8,7 @@ from dataclasses import dataclass from datetime import datetime from functools import lru_cache +from ipaddress import IPv4Address import logging import os import re @@ -67,19 +68,25 @@ def build_headers( def check_config( - central_name: str | None, - username: str | None, - password: str | None, + central_name: str, + host: str, + username: str, + password: str, storage_folder: str, - extended_validation: bool = True, + callback_host: str | None, + callback_port: int | None, + json_port: int | None, ) -> list[str]: """Check config. Throws BaseHomematicException on failure.""" config_failures: list[str] = [] - if extended_validation and central_name and IDENTIFIER_SEPARATOR in central_name: + if central_name and IDENTIFIER_SEPARATOR in central_name: config_failures.append(f"Instance name must not contain {IDENTIFIER_SEPARATOR}") + + if not (is_valid_hostname(hostname=host) or is_valid_ipv4_address(address=host)): + config_failures.append("Invalid hostname or ipv4 address") if not username: config_failures.append("Username must not be empty") - if password is None: + if not password: config_failures.append("Password is required") if not check_password(password): config_failures.append("Password is not valid") @@ -87,6 +94,14 @@ def check_config( check_or_create_directory(storage_folder) except BaseHomematicException as haex: config_failures.append(reduce_args(haex.args)[0]) + if callback_host and not ( + is_valid_hostname(hostname=callback_host) or is_valid_ipv4_address(address=callback_host) + ): + config_failures.append("Invalid callback hostname or ipv4 address") + if callback_port and not is_valid_port(port=callback_port): + config_failures.append("Invalid callback port") + if json_port and not is_valid_port(port=json_port): + config_failures.append("Invalid json port") return config_failures @@ -232,6 +247,38 @@ def get_ip_addr(host: str, port: int) -> str | None: return local_ip +def is_valid_hostname(hostname: str) -> bool: + """Return True if hostname is valid.""" + if hostname[-1] == ".": + # strip exactly one dot from the right, if present + hostname = hostname[:-1] + if len(hostname) > 253 or len(hostname) < 1: + return False + + labels = hostname.split(".") + + # the TLD must be not all-numeric + if re.match(r"[0-9]+$", labels[-1]): + return False + + allowed = re.compile(r"(?!-)[a-z0-9-]{1,63}(? bool: + """Return True if ipv4_address is valid.""" + try: + IPv4Address(address=address) + except ValueError: + return False + return True + + +def is_valid_port(port: int) -> bool: + """Return True if port is valid.""" + return 0 <= port <= 65535 + + def element_matches_key( search_elements: str | Collection[str], compare_with: str | None, diff --git a/pyproject.toml b/pyproject.toml index a1d1684f..ca1da918 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "hahomematic" -version = "2024.8.11" +version = "2024.8.12" license = {text = "MIT License"} description = "Homematic interface for Home Assistant running on Python 3." readme = "README.md" diff --git a/tests/test_support.py b/tests/test_support.py index 7d6c63dc..5fe4d869 100644 --- a/tests/test_support.py +++ b/tests/test_support.py @@ -34,6 +34,8 @@ find_free_port, get_channel_no, get_tls_context, + is_valid_hostname, + is_valid_ipv4_address, parse_sys_var, to_bool, ) @@ -544,3 +546,24 @@ def test_converter( assert converter(input_value) == result_value if re_converter := _COMBINED_PARAMETER_TO_HM_CONVERTER.get(parameter): assert re_converter(result_value) == input_value + + +def test_is_valid_hostname() -> None: + """Test is_valid_hostname.""" + assert is_valid_hostname(" ") is False + assert is_valid_hostname("123") is False + assert is_valid_hostname("ccu") is True + assert is_valid_hostname("ccu.test.de") is True + assert is_valid_hostname("ccu.de") is True + assert is_valid_hostname("ccu.123") is False + assert is_valid_hostname("192.168.178.2") is False + assert is_valid_hostname("5422eb72-raspberrymatic") is True + + +def test_is_valid_ipv4_address() -> None: + """Test is_valid_ipv4_address.""" + assert is_valid_ipv4_address("") is False + assert is_valid_ipv4_address(" ") is False + assert is_valid_ipv4_address("192.168.1782") is False + assert is_valid_ipv4_address("192.168.178.2") is True + assert is_valid_ipv4_address("ccu") is False