From 628ceaa3bf2d5878e52ec076038fc9c9d1cf1e07 Mon Sep 17 00:00:00 2001 From: spacemanspiff2007 <10754716+spacemanspiff2007@users.noreply.github.com> Date: Mon, 28 Oct 2024 13:10:18 +0100 Subject: [PATCH] - Added MqttTopicInfo - Fixed listen only for mqtt --- .pre-commit-config.yaml | 2 +- docs/interface_mqtt.rst | 33 +++++++++ run/conf_testing/rules/test_mqtt.py | 8 +++ src/HABApp/__version__.py | 2 +- src/HABApp/mqtt/__init__.py | 6 +- src/HABApp/mqtt/connection/publish.py | 17 +++-- src/HABApp/mqtt/mqtt_interface.py | 99 --------------------------- src/HABApp/mqtt/util/__init__.py | 1 + src/HABApp/mqtt/util/topic_info.py | 59 ++++++++++++++++ 9 files changed, 120 insertions(+), 107 deletions(-) delete mode 100644 src/HABApp/mqtt/mqtt_interface.py create mode 100644 src/HABApp/mqtt/util/__init__.py create mode 100644 src/HABApp/mqtt/util/topic_info.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3bc493d6..8042f512 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,7 +15,7 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.7.0 + rev: v0.7.1 hooks: - id: ruff name: ruff unused imports diff --git a/docs/interface_mqtt.rst b/docs/interface_mqtt.rst index ca856750..cac55b2b 100644 --- a/docs/interface_mqtt.rst +++ b/docs/interface_mqtt.rst @@ -162,6 +162,39 @@ and it will also trigger for :class:`~HABApp.mqtt.events.MqttValueChangeEvent`. :member-order: groupwise +Mqtt util +-------------------------------------- + +MqttTopicInfo +"""""""""""""""""""""""""""""""""""""" + +.. autoclass:: HABApp.mqtt.util.MqttTopicInfo + :members: + :inherited-members: + :member-order: groupwise + +.. exec_code:: + :hide_output: + + # ------------ hide: start ------------ + from rule_runner import SimpleRuleRunner + SimpleRuleRunner().set_up() + + import HABApp + from unittest.mock import MagicMock + HABApp.mqtt.util.topic_info.publish = MagicMock() + # ------------ hide: stop ------------- + from HABApp.mqtt.util import MqttTopicInfo + + topic = MqttTopicInfo('my/output/only/topic') + topic.publish('new_value') + + topic_qos = MqttTopicInfo('my/output/only/topic', qos=2) + topic_qos.publish('new_value') + + topic_qos_retain = topic_qos.replace(retain=True) + topic_qos_retain.publish('new_value') + Example MQTT rule -------------------------------------- diff --git a/run/conf_testing/rules/test_mqtt.py b/run/conf_testing/rules/test_mqtt.py index fc8ceeb6..dcd7a428 100644 --- a/run/conf_testing/rules/test_mqtt.py +++ b/run/conf_testing/rules/test_mqtt.py @@ -8,6 +8,7 @@ from HABApp.core.events import ValueUpdateEventFilter from HABApp.mqtt.events import MqttValueUpdateEventFilter from HABApp.mqtt.items import MqttItem, MqttPairItem +from HABApp.mqtt.util import MqttTopicInfo log = logging.getLogger('HABApp.MqttTestEvents') @@ -30,6 +31,7 @@ def __init__(self) -> None: self.add_test('MQTT item creation', self.test_mqtt_item_creation) self.add_test('MQTT pair item', self.test_mqtt_pair_item) + self.add_test('MQTT topic info', self.test_mqtt_topic_info) def test_mqtt_pair_item(self) -> None: topic_read = 'test/topic_read' @@ -88,5 +90,11 @@ async def trigger_reconnect(self) -> None: connection.status._set_manual(ConnectionStatus.DISCONNECTED) connection.advance_status_task.start_if_not_running() + def test_mqtt_topic_info(self) -> None: + t = MqttTopicInfo('test/event_topic') + with EventWaiter(t.topic, ValueUpdateEventFilter()) as waiter: + t.publish('asdf') + waiter.wait_for_event(value='asdf') + TestMQTTEvents() diff --git a/src/HABApp/__version__.py b/src/HABApp/__version__.py index ef57ecf9..198afa68 100644 --- a/src/HABApp/__version__.py +++ b/src/HABApp/__version__.py @@ -10,4 +10,4 @@ # Development versions contain the DEV-COUNTER postfix: # - 24.01.0.DEV-1 -__version__ = '24.09.0.DEV-10' +__version__ = '24.09.0.DEV-11' diff --git a/src/HABApp/mqtt/__init__.py b/src/HABApp/mqtt/__init__.py index cb74775d..80c89ce0 100644 --- a/src/HABApp/mqtt/__init__.py +++ b/src/HABApp/mqtt/__init__.py @@ -1,7 +1,9 @@ -from . import events -from . import items +from . import events, items, util + # isort: split + import HABApp.mqtt.interface_async import HABApp.mqtt.interface_sync + diff --git a/src/HABApp/mqtt/connection/publish.py b/src/HABApp/mqtt/connection/publish.py index 21944d55..dc4af7d1 100644 --- a/src/HABApp/mqtt/connection/publish.py +++ b/src/HABApp/mqtt/connection/publish.py @@ -1,4 +1,7 @@ from asyncio import Queue +from typing import Any + +from pydantic import BaseModel from HABApp.config import CONFIG from HABApp.config.models.mqtt import QOS @@ -13,6 +16,9 @@ def __init__(self) -> None: super().__init__(task_name='MqttPublish') async def mqtt_task(self) -> None: + if CONFIG.mqtt.general.listen_only: + return None + connection = self.plugin_connection with connection.handle_exception(self.mqtt_task): client = self.plugin_connection.context @@ -36,7 +42,8 @@ async def mqtt_task(self) -> None: async def on_connected(self) -> None: global QUEUE - QUEUE = Queue() + if not CONFIG.mqtt.general.listen_only: + QUEUE = Queue() await super().on_connected() async def on_disconnected(self) -> None: @@ -52,7 +59,7 @@ async def on_disconnected(self) -> None: PUBLISH_HANDLER = PublishHandler() -def async_publish(topic: str | ItemRegistryItem, payload, qos: QOS | None = None, +def async_publish(topic: str | ItemRegistryItem, payload: Any, qos: QOS | None = None, retain: bool | None = None) -> None: """ Publish a value under a certain topic. @@ -70,14 +77,16 @@ def async_publish(topic: str | ItemRegistryItem, payload, qos: QOS | None = None topic = topic.name # convert these to string - if isinstance(payload, (dict, list, set, frozenset)): + if isinstance(payload, BaseModel): + payload = payload.model_dump_json() + elif isinstance(payload, (dict, list, set, frozenset)): payload = dump_json(payload) queue.put_nowait((topic, payload, qos, retain)) return None -def publish(topic: str | ItemRegistryItem, payload, qos: QOS | None = None, retain: bool | None = None) -> None: +def publish(topic: str | ItemRegistryItem, payload: Any, qos: QOS | None = None, retain: bool | None = None) -> None: """ Publish a value under a certain topic. If qos and/or retain is not set the value from the configuration file will be used. diff --git a/src/HABApp/mqtt/mqtt_interface.py b/src/HABApp/mqtt/mqtt_interface.py deleted file mode 100644 index de470844..00000000 --- a/src/HABApp/mqtt/mqtt_interface.py +++ /dev/null @@ -1,99 +0,0 @@ -import typing - -import paho.mqtt.client as mqtt - -import HABApp -from HABApp.core.const.json import dump_json -from HABApp.mqtt.connection.connection import CONNECTION, log - - -def __is_connected() -> bool: - if CONNECTION.is_connected: - return True - - msg = 'Mqtt client not connected' - raise ConnectionError(msg) - - -def publish(topic: str, payload: typing.Any, - qos: int | None = None, retain: bool | None = None) -> int: - """ - Publish a value under a certain topic. - If qos and/or retain is not set the value from the configuration file will be used. - - :param topic: MQTT topic - :param payload: MQTT Payload - :param qos: QoS, can be 0, 1 or 2. If not specified value from configuration file will be used. - :param retain: retain message. If not specified value from configuration file will be used. - :return: 0 if successful - """ - - assert isinstance(topic, str), type(topic) - assert isinstance(qos, int) or qos is None, type(qos) - assert isinstance(retain, bool) or retain is None, type(retain) - - config = HABApp.config.CONFIG.mqtt - - if not __is_connected(): - return mqtt.MQTT_ERR_NO_CONN - if config.general.listen_only: - return 100 - - if qos is None: - qos = config.publish.qos - if retain is None: - retain = config.publish.retain - - # convert these to string - if isinstance(payload, (dict, list)): - payload = dump_json(payload) - - info = CONNECTION.client.publish(topic, payload, qos, retain) - if info.rc != mqtt.MQTT_ERR_SUCCESS: - log.error(f'Could not publish to "{topic}": {mqtt.error_string(info.rc)}') - return info - - -def subscribe(topic: str, qos: int | None = None) -> int: - """ - Subscribe to a MQTT topic. Note that subscriptions made this way are volatile and will only remain until - the next disconnect. - - :param topic: MQTT topic to subscribe to - :param qos: QoS, can be 0, 1 or 2. If not specified value from configuration file will be used. - :return: 0 if successful - """ - - assert isinstance(topic, str), type(topic) - assert isinstance(qos, int) or qos is None, type(qos) - - if not __is_connected(): - return mqtt.MQTT_ERR_NO_CONN - - # If no qos is specified load it from config - if qos is None: - qos = HABApp.config.CONFIG.mqtt.subscribe.qos - - res, mid = CONNECTION.client.subscribe(topic, qos) - if res != mqtt.MQTT_ERR_SUCCESS: - log.error(f'Could not subscribe to "{topic}": {mqtt.error_string(res)}') - return res - - -def unsubscribe(topic: str) -> int: - """ - Unsubscribe from a MQTT topic - - :param topic: MQTT topic - :return: 0 if successful - """ - - assert isinstance(topic, str), type(topic) - - if not __is_connected(): - return mqtt.MQTT_ERR_NO_CONN - - result, mid = CONNECTION.context.unsubscribe(topic) - if result != mqtt.MQTT_ERR_SUCCESS: - log.error(f'Could not unsubscribe from "{topic}": {mqtt.error_string(result)}') - return result diff --git a/src/HABApp/mqtt/util/__init__.py b/src/HABApp/mqtt/util/__init__.py new file mode 100644 index 00000000..832e28c3 --- /dev/null +++ b/src/HABApp/mqtt/util/__init__.py @@ -0,0 +1 @@ +from .topic_info import MqttTopicInfo diff --git a/src/HABApp/mqtt/util/topic_info.py b/src/HABApp/mqtt/util/topic_info.py new file mode 100644 index 00000000..af3ec161 --- /dev/null +++ b/src/HABApp/mqtt/util/topic_info.py @@ -0,0 +1,59 @@ +from typing import Any, Final + +from typing_extensions import Self + +from HABApp.mqtt.interface_sync import publish + + +class MqttTopicInfo: + """Allows to store the topic, qos and retain settings for a topic. These values can then be used to publish + """ + def __init__(self, topic: str, qos: int | None = None, retain: bool | None = None) -> None: + if not isinstance(topic, str): + raise TypeError() + if not topic: + raise ValueError() + + self._topic: Final = topic + self._qos: Final = qos + self._retain: Final = retain + + @property + def topic(self) -> str: + """The topic""" + return self._topic + + @property + def qos(self) -> int | None: + """QOS""" + return self._qos + + @property + def retain(self) -> bool | None: + """Retain""" + return self._retain + + def publish(self, payload: Any) -> None: + """ + Publish a payload + + :param payload: MQTT Payload + """ + + return publish(self._topic, payload, qos=self._qos, retain=self._retain) + + def replace(self, topic: str | None = None, qos: int | None = None, retain: bool | None = None) -> Self: + """ + Replace the topic, qos and retain with the given values and return a new object. + + :param topic: New topic (if provided) + :param qos: New qos (if provided) + :param retain: New retain (if provided) + :return: New object + """ + + return self.__class__( + topic if topic is not None else self._topic, + qos if qos is not None else self._qos, + retain if retain is not None else self._retain + )