Skip to content

Commit

Permalink
Fix mqtt issues
Browse files Browse the repository at this point in the history
  • Loading branch information
spacemanspiff2007 committed Sep 24, 2023
1 parent fb9cb5b commit 3709d37
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 8 deletions.
21 changes: 14 additions & 7 deletions run/conf_testing/rules/test_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import time

import HABApp
from HABApp.core.connections import Connections, ConnectionStatus
from HABApp.core.events import ValueUpdateEventFilter
from HABApp.mqtt.connection.handler import CONNECTION_HANDLER
from HABApp.mqtt.events import MqttValueUpdateEventFilter
from HABApp.mqtt.items import MqttItem, MqttPairItem
from HABAppTests import EventWaiter, ItemWaiter, TestBaseRule, run_coro
Expand Down Expand Up @@ -63,21 +63,28 @@ def test_mqtt_item_creation(self):
topic = 'mqtt/item/creation'
assert HABApp.core.Items.item_exists(topic) is False

assert self.mqtt.publish(topic, 'asdf')
self.mqtt.publish(topic, 'asdf')
time.sleep(0.1)
assert HABApp.core.Items.item_exists(topic) is False

# We create the item only on retain
assert self.mqtt.publish(topic, 'asdf', retain=True)
self.mqtt.publish(topic, 'asdf', retain=True)
time.sleep(0.2)

run_coro(self.trigger_reconnect())

# We need to reconnect to receive the message
connection = CONNECTION_HANDLER.plugin_connection
run_coro(CONNECTION_HANDLER.on_disconnected(connection, connection.context))
run_coro(CONNECTION_HANDLER.on_connecting(connection, connection.context))
time.sleep(0.2)
connection = Connections.get('mqtt')
while not connection.is_online:
time.sleep(0.2)
assert HABApp.core.Items.item_exists(topic) is True

HABApp.core.Items.pop_item(topic)

async def trigger_reconnect(self):
connection = Connections.get('mqtt')
connection.status._set_manual(ConnectionStatus.DISCONNECTED)
connection.advance_status_task.start_if_not_running()


TestMQTTEvents()
3 changes: 3 additions & 0 deletions src/HABApp/mqtt/connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import HABApp
from HABApp.core.asyncio import AsyncContext
from HABApp.core.connections import BaseConnection, Connections, ConnectionStateToEventBusPlugin, AutoReconnectPlugin
from HABApp.core.connections.base_connection import AlreadyHandledException
from HABApp.core.connections.base_plugin import BaseConnectionPluginConnectedTask
from HABApp.core.const.const import PYTHON_310

Expand Down Expand Up @@ -67,5 +68,7 @@ async def _mqtt_wrap_task(self):
try:
with AsyncContext('MQTT'), connection.handle_exception(self.mqtt_task):
await self.mqtt_task()
except AlreadyHandledException:
pass
finally:
log.debug(f'{self.task.name} task stop')
2 changes: 1 addition & 1 deletion src/HABApp/mqtt/connection/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,4 @@ def publish(topic: Union[str, ItemRegistryItem], payload, qos: Optional[QOS] = N
:param qos: QoS, can be 0, 1 or 2. If not specified the value from configuration file will be used.
:param retain: retain message. If not specified the value from configuration file will be used.
"""
run_func_from_async(topic, payload, qos, retain)
run_func_from_async(async_publish, topic, payload, qos, retain)
6 changes: 6 additions & 0 deletions src/HABApp/mqtt/connection/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ async def apply_subscriptions(self):

async def on_connected(self):
await super().on_connected()

# Since we are freshly connected we have not yet subscribed to anything
# We need to clear this here because in case of error it might still have the topics
# from the last successful subscription in this dict
self.subscribed_to.clear()

self.sub_task.start_if_not_running()
await self.sub_task.wait()

Expand Down

0 comments on commit 3709d37

Please sign in to comment.