Skip to content

Commit

Permalink
Decompose combined parameter (#1513)
Browse files Browse the repository at this point in the history
  • Loading branch information
SukramJ authored Apr 17, 2024
1 parent d5bda72 commit 84965e1
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 9 deletions.
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Version 2024.4.9 (2024-04-16)

- Decompose combined parameter

# Version 2024.4.8 (2024-04-13)

- Make entity event async
Expand Down
20 changes: 20 additions & 0 deletions hahomematic/caches/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
InterfaceName,
ParamsetKey,
)
from hahomematic.converter import CONVERTABLE_PARAMETERS, convert_combined_parameter_to_paramset
from hahomematic.platforms.device import HmDevice
from hahomematic.support import changed_within_seconds, get_channel_no, get_device_address

Expand All @@ -52,6 +53,12 @@ def add_set_value(
value: Any,
) -> None:
"""Add data from set value command."""
if parameter in CONVERTABLE_PARAMETERS:
self.add_combined_parameter(
parameter=parameter, channel_address=channel_address, combined_parameter=value
)
return

key = (
ParamsetKey.VALUES.value,
get_device_address(channel_address),
Expand All @@ -77,6 +84,19 @@ def add_put_paramset(
)
self._last_send_command[key] = (value, datetime.now())

def add_combined_parameter(
self, parameter: str, channel_address: str, combined_parameter: str
) -> None:
"""Add data from combined parameter."""
if values := convert_combined_parameter_to_paramset(
parameter=parameter, cpv=combined_parameter
):
self.add_put_paramset(
channel_address=channel_address,
paramset_key=ParamsetKey.VALUES.value,
values=values,
)

def get_last_value_send(
self,
paramset_key: str,
Expand Down
67 changes: 67 additions & 0 deletions hahomematic/converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Converters used by hahomematic."""

from __future__ import annotations

import ast
from typing import Any, Final, cast

from hahomematic.const import Parameter


def _convert_cpv_to_hm_level(cpv: Any) -> Any:
"""Convert combined parameter value for hm level."""
if isinstance(cpv, str) and cpv.startswith("0x"):
return ast.literal_eval(cpv) / 100 / 2
return cpv


def convert_hm_level_to_cpv(hm_level: Any) -> Any:
"""Convert hm level to combined parameter value."""
return format(int(hm_level * 100 * 2), "#04x")


CONVERTABLE_PARAMETERS: Final = (Parameter.COMBINED_PARAMETER, Parameter.LEVEL_COMBINED)

_COMBINED_PARAMETER_TO_HM_CONVERTER: Final = {
Parameter.LEVEL_COMBINED: _convert_cpv_to_hm_level,
}

_COMBINED_PARAMETER_NAMES: Final = {"L": Parameter.LEVEL, "L2": Parameter.LEVEL_2}


def _convert_combined_parameter_to_paramset(cpv: str) -> dict[str, Any]:
"""Convert combined parameter to paramset."""
paramset: dict[str, Any] = {}
for cp_param_value in cpv.split(","):
cp_param, value = cp_param_value.split("=")
if parameter := _COMBINED_PARAMETER_NAMES.get(cp_param):
if converter := _COMBINED_PARAMETER_TO_HM_CONVERTER.get(parameter):
paramset[parameter] = converter(value)
else:
paramset[parameter] = value
return paramset


def _convert_level_combined_to_paramset(lcv: str) -> dict[str, Any]:
"""Convert combined parameter to paramset."""
if "," in lcv:
l1_value, l2_value = lcv.split(",")
if converter := _COMBINED_PARAMETER_TO_HM_CONVERTER.get(Parameter.LEVEL_COMBINED):
return {
Parameter.LEVEL: converter(l1_value),
Parameter.LEVEL_SLATS: converter(l2_value),
}
return {}


_COMBINED_PARAMETER_TO_PARAMSET_CONVERTER: Final = {
Parameter.COMBINED_PARAMETER: _convert_combined_parameter_to_paramset,
Parameter.LEVEL_COMBINED: _convert_level_combined_to_paramset,
}


def convert_combined_parameter_to_paramset(parameter: str, cpv: str) -> dict[str, Any]:
"""Convert combined parameter to paramset."""
if converter := _COMBINED_PARAMETER_TO_PARAMSET_CONVERTER.get(parameter): # type: ignore[call-overload]
return cast(dict[str, Any], converter(cpv))
return {}
5 changes: 3 additions & 2 deletions hahomematic/platforms/custom/cover.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import Any, Final

from hahomematic.const import EntityUsage, HmPlatform, Parameter
from hahomematic.converter import convert_hm_level_to_cpv
from hahomematic.platforms import device as hmd
from hahomematic.platforms.custom import definition as hmed
from hahomematic.platforms.custom.const import DeviceProfile, Field
Expand Down Expand Up @@ -336,9 +337,9 @@ def _get_combined_value(
levels: list[str] = []
# the resulting hex value is based on the doubled position
if level is not None:
levels.append(format(int(level * 100 * 2), "#04x"))
levels.append(convert_hm_level_to_cpv(hm_level=level))
if tilt_level is not None:
levels.append(format(int(tilt_level * 100 * 2), "#04x"))
levels.append(convert_hm_level_to_cpv(hm_level=tilt_level))

if levels:
return ",".join(levels)
Expand Down
6 changes: 0 additions & 6 deletions hahomematic/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

import asyncio
import base64
from collections.abc import Collection
import contextlib
Expand Down Expand Up @@ -293,11 +292,6 @@ def is_valid(self) -> bool:
return changed_within_seconds(last_change=self.last_refresh)


def cancelling(task: asyncio.Future[Any]) -> bool:
"""Return True if task is cancelling."""
return bool((cancelling_ := getattr(task, "cancelling", None)) and cancelling_())


def debug_enabled() -> bool:
"""Check if debug mode is enabled."""
try:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "hahomematic"
version = "2024.4.8"
version = "2024.4.9"
license = {text = "MIT License"}
description = "Homematic interface for Home Assistant running on Python 3."
readme = "README.md"
Expand Down
46 changes: 46 additions & 0 deletions tests/test_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

from __future__ import annotations

from collections.abc import Callable
from datetime import datetime, timedelta
from typing import Any
from unittest.mock import patch

import pytest

from hahomematic.caches.visibility import _get_value_from_dict_by_wildcard_key
from hahomematic.const import INIT_DATETIME, EntityUsage, ParameterType, SysvarType
from hahomematic.converter import _COMBINED_PARAMETER_TO_HM_CONVERTER, convert_hm_level_to_cpv
from hahomematic.exceptions import HaHomematicException
from hahomematic.platforms.support import (
_check_channel_name_with_channel_no,
Expand Down Expand Up @@ -421,3 +424,46 @@ def test_password() -> None:
assert check_password("test123TEST") is True
assert check_password("test.!$():;#-") is True
assert check_password("test%") is False


@pytest.mark.parametrize(
("parameter", "input_value", "converter", "result_value"),
[
(
"LEVEL_COMBINED",
0,
convert_hm_level_to_cpv,
"0x00",
),
(
"LEVEL_COMBINED",
0.17,
convert_hm_level_to_cpv,
"0x22",
),
(
"LEVEL_COMBINED",
0.81,
convert_hm_level_to_cpv,
"0xa2",
),
(
"LEVEL_COMBINED",
1,
convert_hm_level_to_cpv,
"0xc8",
),
],
)
def test_converter(
parameter: str,
input_value: Any,
converter: Callable,
result_value: Any,
) -> None:
"""Test device un ignore."""

assert input_value is not None
assert converter(input_value) == result_value
if re_converter := _COMBINED_PARAMETER_TO_HM_CONVERTER.get(parameter):
assert re_converter(result_value) == input_value

0 comments on commit 84965e1

Please sign in to comment.